use std::collections::HashMap;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use hexeract_core::CorrelationId;
use hexeract_core::HandlerContext;
use hexeract_core::MessageId;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use crate::Event;
use crate::Handler;
use crate::OutboxEnvelope;
use crate::OutboxError;
pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
#[async_trait::async_trait]
pub trait OutboxStore: Send + Sync + 'static {
type Client: Send;
type Tx<'tx>: Send
where
Self: 'tx;
async fn acquire(&self) -> Result<Self::Client, OutboxError>;
async fn begin<'a>(&self, client: &'a mut Self::Client) -> Result<Self::Tx<'a>, OutboxError>;
async fn poll<'a>(
&self,
tx: &mut Self::Tx<'a>,
batch_size: usize,
max_attempts: u32,
) -> Result<Vec<OutboxEnvelope>, OutboxError>;
async fn mark_delivered<'a>(
&self,
tx: &mut Self::Tx<'a>,
event_id: Uuid,
) -> Result<(), OutboxError>;
async fn mark_failed<'a>(
&self,
tx: &mut Self::Tx<'a>,
event_id: Uuid,
error: &str,
retry_in: Duration,
) -> Result<(), OutboxError>;
async fn commit<'a>(&self, tx: Self::Tx<'a>) -> Result<(), OutboxError>;
async fn mark_dead_lettered<'a>(
&self,
_tx: &mut Self::Tx<'a>,
_event_id: Uuid,
_error: &str,
) -> Result<(), OutboxError> {
Ok(())
}
async fn claim<'a>(
&self,
_tx: &mut Self::Tx<'a>,
_event_ids: &[Uuid],
_lease_for: Duration,
) -> Result<(), OutboxError> {
Ok(())
}
}
pub trait ErasedHandler: Send + Sync + 'static {
fn event_type(&self) -> &'static str;
fn handle<'a>(
&'a self,
envelope: &'a OutboxEnvelope,
ctx: &'a HandlerContext,
) -> BoxFuture<'a, Result<(), OutboxError>>;
}
pub struct TypedHandler<E, H>
where
E: Event,
H: Handler<E>,
{
handler: Arc<H>,
_phantom: PhantomData<fn() -> E>,
}
impl<E, H> TypedHandler<E, H>
where
E: Event,
H: Handler<E>,
{
#[must_use]
pub fn new(handler: H) -> Self {
Self {
handler: Arc::new(handler),
_phantom: PhantomData,
}
}
#[must_use]
pub fn shared(handler: Arc<H>) -> Self {
Self {
handler,
_phantom: PhantomData,
}
}
}
impl<E, H> ErasedHandler for TypedHandler<E, H>
where
E: Event,
H: Handler<E>,
{
fn event_type(&self) -> &'static str {
E::EVENT_TYPE
}
fn handle<'a>(
&'a self,
envelope: &'a OutboxEnvelope,
ctx: &'a HandlerContext,
) -> BoxFuture<'a, Result<(), OutboxError>> {
Box::pin(async move {
let event: E = envelope.decode()?;
self.handler.handle(event, ctx).await.map_err(Into::into)
})
}
}
#[derive(Debug, Clone)]
pub struct OutboxWorkerConfig {
pub poll_interval: Duration,
pub batch_size: usize,
pub max_attempts: u32,
pub retry_base_delay: Duration,
pub retry_max_delay: Duration,
pub jitter: bool,
pub min_cycle_delay: Duration,
pub dispatch_timeout: Duration,
}
impl Default for OutboxWorkerConfig {
fn default() -> Self {
Self {
poll_interval: Duration::from_millis(100),
batch_size: 10,
max_attempts: 5,
retry_base_delay: Duration::from_secs(1),
retry_max_delay: Duration::from_secs(300),
jitter: true,
min_cycle_delay: Duration::from_millis(5),
dispatch_timeout: Duration::from_secs(30),
}
}
}
impl OutboxWorkerConfig {
#[must_use]
pub fn next_retry_delay(&self, attempts: u32) -> Duration {
let factor = 1u32.checked_shl(attempts).unwrap_or(u32::MAX);
let capped = self
.retry_base_delay
.saturating_mul(factor)
.min(self.retry_max_delay);
if self.jitter {
let nanos = capped.as_nanos();
let nanos_u64 = u64::try_from(nanos).unwrap_or(u64::MAX);
Duration::from_nanos(fastrand::u64(0..=nanos_u64))
} else {
capped
}
}
}
pub struct OutboxWorker<S>
where
S: OutboxStore,
{
store: S,
handlers: Arc<HashMap<&'static str, Arc<dyn ErasedHandler>>>,
config: OutboxWorkerConfig,
}
impl<S> OutboxWorker<S>
where
S: OutboxStore,
{
#[must_use]
pub fn new(
store: S,
handlers: HashMap<&'static str, Arc<dyn ErasedHandler>>,
config: OutboxWorkerConfig,
) -> Self {
Self {
store,
handlers: Arc::new(handlers),
config,
}
}
pub fn run(
self,
cancel: CancellationToken,
) -> Pin<Box<dyn Future<Output = Result<(), OutboxError>> + Send>>
where
for<'a> S::Tx<'a>: Send,
{
Box::pin(async move {
while !cancel.is_cancelled() {
let sleep_for = match self.poll_cycle(&cancel).await {
Ok(0) => Some(self.config.poll_interval),
Ok(_) => {
if self.config.min_cycle_delay.is_zero() {
None
} else {
Some(self.config.min_cycle_delay)
}
}
Err(err) => {
tracing::error!(
error = ?err,
"outbox worker poll cycle failed, sleeping before retry"
);
Some(self.config.poll_interval)
}
};
if let Some(delay) = sleep_for {
tokio::select! {
() = tokio::time::sleep(delay) => {}
() = cancel.cancelled() => break,
}
}
}
Ok(())
})
}
async fn poll_cycle(&self, cancel: &CancellationToken) -> Result<usize, OutboxError> {
let envelopes = {
let mut client = self.store.acquire().await?;
let mut tx = self.store.begin(&mut client).await?;
let batch = self
.store
.poll(&mut tx, self.config.batch_size, self.config.max_attempts)
.await?;
if !batch.is_empty() {
let ids: Vec<Uuid> = batch.iter().map(|e| e.event_id).collect();
self.store
.claim(&mut tx, &ids, self.lease_for(ids.len()))
.await?;
}
self.store.commit(tx).await?;
batch
};
let count = envelopes.len();
for envelope in &envelopes {
if let Err(err) = self.dispatch_and_settle(envelope, cancel).await {
tracing::error!(
event_id = %envelope.event_id,
event_type = %envelope.event_type,
error = ?err,
"failed to settle outbox envelope; continuing with the rest of the batch"
);
}
}
Ok(count)
}
fn lease_for(&self, batch_len: usize) -> Duration {
let factor = u32::try_from(batch_len.max(1)).unwrap_or(u32::MAX);
self.config.dispatch_timeout.saturating_mul(factor)
}
async fn dispatch_and_settle(
&self,
envelope: &OutboxEnvelope,
cancel: &CancellationToken,
) -> Result<(), OutboxError> {
match self.dispatch(envelope, cancel).await {
Ok(()) => {
let mut client = self.store.acquire().await?;
let mut tx = self.store.begin(&mut client).await?;
self.store
.mark_delivered(&mut tx, envelope.event_id)
.await?;
self.store.commit(tx).await?;
}
Err(err) => {
let message = err.to_string();
tracing::warn!(
event_id = %envelope.event_id,
event_type = %envelope.event_type,
error = %message,
"outbox handler dispatch failed"
);
let retry_in = self.config.next_retry_delay(envelope.attempts);
let mut client = self.store.acquire().await?;
let mut tx = self.store.begin(&mut client).await?;
self.store
.mark_failed(&mut tx, envelope.event_id, &message, retry_in)
.await?;
if envelope.attempts + 1 >= self.config.max_attempts {
tracing::error!(
event_id = %envelope.event_id,
event_type = %envelope.event_type,
attempts = envelope.attempts + 1,
"outbox envelope exhausted retry budget, moving to dead letter"
);
self.store
.mark_dead_lettered(&mut tx, envelope.event_id, &message)
.await?;
}
self.store.commit(tx).await?;
}
}
Ok(())
}
async fn dispatch(
&self,
envelope: &OutboxEnvelope,
cancel: &CancellationToken,
) -> Result<(), OutboxError> {
let Some(handler) = self.handlers.get(envelope.event_type.as_str()) else {
return Err(OutboxError::MissingHandler {
event_type: envelope.event_type.clone(),
});
};
let ctx = HandlerContext::new(
MessageId::from(envelope.event_id),
CorrelationId::from(envelope.event_id),
)
.with_cancellation(cancel.child_token());
tracing::debug!(
event_id = %envelope.event_id,
event_type = %envelope.event_type,
"dispatching outbox envelope"
);
match tokio::time::timeout(self.config.dispatch_timeout, handler.handle(envelope, &ctx))
.await
{
Ok(result) => result,
Err(_elapsed) => {
ctx.cancellation.cancel();
Err(OutboxError::DispatchTimeout {
event_id: envelope.event_id,
event_type: envelope.event_type.clone(),
timeout: self.config.dispatch_timeout,
})
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde::Deserialize;
use serde::Serialize;
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct UserRegistered {
user_id: Uuid,
}
impl Event for UserRegistered {
const EVENT_TYPE: &'static str = "users.registered";
}
struct RecordingHandler {
seen: Arc<Mutex<Vec<Uuid>>>,
}
impl Handler<UserRegistered> for RecordingHandler {
type Error = OutboxError;
async fn handle(
&self,
event: UserRegistered,
_ctx: &HandlerContext,
) -> Result<(), Self::Error> {
self.seen.lock().unwrap().push(event.user_id);
Ok(())
}
}
struct ContextCapturingHandler {
captured_ids: Arc<Mutex<Vec<MessageId>>>,
}
impl Handler<UserRegistered> for ContextCapturingHandler {
type Error = OutboxError;
async fn handle(
&self,
_event: UserRegistered,
ctx: &HandlerContext,
) -> Result<(), Self::Error> {
self.captured_ids.lock().unwrap().push(ctx.message_id);
Ok(())
}
}
struct FailingHandler;
impl Handler<UserRegistered> for FailingHandler {
type Error = OutboxError;
async fn handle(
&self,
_event: UserRegistered,
_ctx: &HandlerContext,
) -> Result<(), Self::Error> {
Err(OutboxError::Internal("forced".into()))
}
}
fn fresh_envelope(user_id: Uuid) -> OutboxEnvelope {
let publisher_test_event = UserRegistered { user_id };
OutboxEnvelope::new(Uuid::new_v4(), &publisher_test_event).unwrap()
}
#[tokio::test]
async fn typed_handler_decodes_envelope_and_calls_inner_handler() {
let seen = Arc::new(Mutex::new(Vec::<Uuid>::new()));
let handler = TypedHandler::new(RecordingHandler {
seen: Arc::clone(&seen),
});
let erased: Arc<dyn ErasedHandler> = Arc::new(handler);
let user_id = Uuid::from_u128(42);
let envelope = fresh_envelope(user_id);
let ctx = HandlerContext::new(MessageId::new(), CorrelationId::new());
erased
.handle(&envelope, &ctx)
.await
.expect("erased dispatch must succeed");
assert_eq!(seen.lock().unwrap().as_slice(), &[user_id]);
}
#[tokio::test]
async fn typed_handler_propagates_handler_error_as_outbox_error() {
let handler = TypedHandler::new(FailingHandler);
let erased: Arc<dyn ErasedHandler> = Arc::new(handler);
let envelope = fresh_envelope(Uuid::nil());
let ctx = HandlerContext::new(MessageId::new(), CorrelationId::new());
let err = erased.handle(&envelope, &ctx).await.expect_err("must fail");
assert!(matches!(err, OutboxError::Internal(_)));
}
#[test]
fn typed_handler_reports_event_type_from_const() {
let handler = TypedHandler::new(RecordingHandler {
seen: Arc::new(Mutex::new(Vec::new())),
});
assert_eq!(handler.event_type(), "users.registered");
}
#[test]
fn default_config_has_expected_values() {
let cfg = OutboxWorkerConfig::default();
assert_eq!(cfg.poll_interval, Duration::from_millis(100));
assert_eq!(cfg.batch_size, 10);
assert_eq!(cfg.max_attempts, 5);
assert_eq!(cfg.retry_base_delay, Duration::from_secs(1));
assert_eq!(cfg.retry_max_delay, Duration::from_secs(300));
assert!(cfg.jitter);
assert_eq!(cfg.min_cycle_delay, Duration::from_millis(5));
assert_eq!(cfg.dispatch_timeout, Duration::from_secs(30));
}
fn deterministic_config(base: Duration, max: Duration) -> OutboxWorkerConfig {
OutboxWorkerConfig {
retry_base_delay: base,
retry_max_delay: max,
jitter: false,
..OutboxWorkerConfig::default()
}
}
#[test]
fn backoff_grows_exponentially_without_jitter() {
let cfg = deterministic_config(Duration::from_secs(1), Duration::from_secs(300));
assert_eq!(cfg.next_retry_delay(0), Duration::from_secs(1));
assert_eq!(cfg.next_retry_delay(1), Duration::from_secs(2));
assert_eq!(cfg.next_retry_delay(2), Duration::from_secs(4));
assert_eq!(cfg.next_retry_delay(3), Duration::from_secs(8));
}
#[test]
fn backoff_caps_at_max_delay() {
let cfg = deterministic_config(Duration::from_secs(1), Duration::from_secs(30));
assert_eq!(cfg.next_retry_delay(10), Duration::from_secs(30));
assert_eq!(cfg.next_retry_delay(100), Duration::from_secs(30));
}
#[test]
fn backoff_overflow_safe_for_large_attempts() {
let cfg = deterministic_config(Duration::from_secs(1), Duration::from_secs(300));
let delay = cfg.next_retry_delay(64);
assert_eq!(
delay,
Duration::from_secs(300),
"overflow must saturate at cap"
);
}
#[test]
fn backoff_jitter_stays_within_bounds() {
let base = Duration::from_secs(1);
let max = Duration::from_secs(30);
let cfg = OutboxWorkerConfig {
retry_base_delay: base,
retry_max_delay: max,
jitter: true,
..OutboxWorkerConfig::default()
};
for attempts in 0u32..8 {
let delay = cfg.next_retry_delay(attempts);
let cap = base
.saturating_mul(1u32.checked_shl(attempts).unwrap_or(u32::MAX))
.min(max);
assert!(
delay <= cap,
"attempt {attempts}: jittered delay {delay:?} must be <= cap {cap:?}"
);
}
}
#[derive(Clone)]
struct PacingStore {
pending: Arc<Mutex<Vec<OutboxEnvelope>>>,
empty_poll_at: Arc<Mutex<Option<tokio::time::Instant>>>,
}
impl PacingStore {
fn new(initial: Vec<OutboxEnvelope>) -> Self {
Self {
pending: Arc::new(Mutex::new(initial)),
empty_poll_at: Arc::new(Mutex::new(None)),
}
}
}
#[async_trait::async_trait]
impl OutboxStore for PacingStore {
type Client = MockClient;
type Tx<'tx> = MockTx;
async fn acquire(&self) -> Result<Self::Client, OutboxError> {
Ok(MockClient)
}
async fn begin<'a>(
&self,
_client: &'a mut Self::Client,
) -> Result<Self::Tx<'a>, OutboxError> {
Ok(MockTx)
}
async fn poll<'a>(
&self,
_tx: &mut Self::Tx<'a>,
batch_size: usize,
_max_attempts: u32,
) -> Result<Vec<OutboxEnvelope>, OutboxError> {
let mut pending = self.pending.lock().unwrap();
let take = batch_size.min(pending.len());
let batch: Vec<OutboxEnvelope> = pending.drain(..take).collect();
if batch.is_empty() {
let mut slot = self.empty_poll_at.lock().unwrap();
if slot.is_none() {
*slot = Some(tokio::time::Instant::now());
}
}
Ok(batch)
}
async fn mark_delivered<'a>(
&self,
_tx: &mut Self::Tx<'a>,
_event_id: Uuid,
) -> Result<(), OutboxError> {
Ok(())
}
async fn mark_failed<'a>(
&self,
_tx: &mut Self::Tx<'a>,
_event_id: Uuid,
_error: &str,
_retry_in: Duration,
) -> Result<(), OutboxError> {
Ok(())
}
async fn commit<'a>(&self, _tx: Self::Tx<'a>) -> Result<(), OutboxError> {
Ok(())
}
}
#[tokio::test(start_paused = true)]
async fn run_paces_consecutive_non_empty_cycles() {
let non_empty_cycles: u32 = 4;
let envelopes: Vec<OutboxEnvelope> = (0..non_empty_cycles)
.map(|i| fresh_envelope(Uuid::from_u128(u128::from(i) + 1)))
.collect();
let store = PacingStore::new(envelopes);
let empty_poll_at = Arc::clone(&store.empty_poll_at);
let delay = Duration::from_millis(10);
let config = OutboxWorkerConfig {
poll_interval: Duration::from_secs(3600),
batch_size: 1,
min_cycle_delay: delay,
..OutboxWorkerConfig::default()
};
let registry = registry_with(vec![Arc::new(TypedHandler::new(RecordingHandler {
seen: Arc::new(Mutex::new(Vec::new())),
}))]);
let worker = OutboxWorker::new(store, registry, config);
let cancel = CancellationToken::new();
let start = tokio::time::Instant::now();
let join = tokio::spawn(worker.run(cancel.clone()));
tokio::time::sleep(delay * non_empty_cycles + Duration::from_millis(1)).await;
cancel.cancel();
join.await.unwrap().unwrap();
let empty_at = empty_poll_at
.lock()
.unwrap()
.expect("the loop should have reached the empty poll");
assert_eq!(
empty_at.duration_since(start),
delay * non_empty_cycles,
"each non-empty cycle must be paced by min_cycle_delay before the empty poll"
);
}
#[derive(Clone)]
struct MockStore {
pending: Arc<Mutex<Vec<OutboxEnvelope>>>,
delivered: Arc<Mutex<Vec<Uuid>>>,
failed: Arc<Mutex<Vec<(Uuid, String)>>>,
dead_lettered: Arc<Mutex<Vec<Uuid>>>,
claimed: Arc<Mutex<Vec<Uuid>>>,
fail_claim: Arc<AtomicBool>,
fail_mark_delivered_for: Arc<Mutex<Option<Uuid>>>,
}
impl MockStore {
fn new(initial: Vec<OutboxEnvelope>) -> Self {
Self {
pending: Arc::new(Mutex::new(initial)),
delivered: Arc::new(Mutex::new(Vec::new())),
failed: Arc::new(Mutex::new(Vec::new())),
dead_lettered: Arc::new(Mutex::new(Vec::new())),
claimed: Arc::new(Mutex::new(Vec::new())),
fail_claim: Arc::new(AtomicBool::new(false)),
fail_mark_delivered_for: Arc::new(Mutex::new(None)),
}
}
}
struct MockClient;
struct MockTx;
#[async_trait::async_trait]
impl OutboxStore for MockStore {
type Client = MockClient;
type Tx<'tx> = MockTx;
async fn acquire(&self) -> Result<Self::Client, OutboxError> {
Ok(MockClient)
}
async fn begin<'a>(
&self,
_client: &'a mut Self::Client,
) -> Result<Self::Tx<'a>, OutboxError> {
Ok(MockTx)
}
async fn poll<'a>(
&self,
_tx: &mut Self::Tx<'a>,
batch_size: usize,
_max_attempts: u32,
) -> Result<Vec<OutboxEnvelope>, OutboxError> {
let mut pending = self.pending.lock().unwrap();
let take = batch_size.min(pending.len());
Ok(pending.drain(..take).collect())
}
async fn mark_delivered<'a>(
&self,
_tx: &mut Self::Tx<'a>,
event_id: Uuid,
) -> Result<(), OutboxError> {
{
let mut slot = self.fail_mark_delivered_for.lock().unwrap();
if *slot == Some(event_id) {
*slot = None;
return Err(OutboxError::PoolTimeout);
}
}
self.delivered.lock().unwrap().push(event_id);
Ok(())
}
async fn mark_failed<'a>(
&self,
_tx: &mut Self::Tx<'a>,
event_id: Uuid,
error: &str,
_retry_in: Duration,
) -> Result<(), OutboxError> {
self.failed
.lock()
.unwrap()
.push((event_id, error.to_owned()));
Ok(())
}
async fn mark_dead_lettered<'a>(
&self,
_tx: &mut Self::Tx<'a>,
event_id: Uuid,
_error: &str,
) -> Result<(), OutboxError> {
self.dead_lettered.lock().unwrap().push(event_id);
Ok(())
}
async fn commit<'a>(&self, _tx: Self::Tx<'a>) -> Result<(), OutboxError> {
Ok(())
}
async fn claim<'a>(
&self,
_tx: &mut Self::Tx<'a>,
event_ids: &[Uuid],
_lease_for: Duration,
) -> Result<(), OutboxError> {
if self.fail_claim.load(Ordering::Relaxed) {
return Err(OutboxError::Internal("claim failed".into()));
}
self.claimed.lock().unwrap().extend_from_slice(event_ids);
Ok(())
}
}
fn registry_with(
handlers: Vec<Arc<dyn ErasedHandler>>,
) -> HashMap<&'static str, Arc<dyn ErasedHandler>> {
let mut map = HashMap::new();
for handler in handlers {
map.insert(handler.event_type(), handler);
}
map
}
#[tokio::test]
async fn worker_dispatches_pending_envelopes_and_marks_delivered() {
let envelopes = vec![
fresh_envelope(Uuid::from_u128(1)),
fresh_envelope(Uuid::from_u128(2)),
];
let event_ids: Vec<Uuid> = envelopes.iter().map(|e| e.event_id).collect();
let store = MockStore::new(envelopes);
let seen = Arc::new(Mutex::new(Vec::new()));
let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(RecordingHandler {
seen: Arc::clone(&seen),
}));
let registry = registry_with(vec![handler]);
let worker = OutboxWorker::new(store.clone(), registry, OutboxWorkerConfig::default());
let cancel = CancellationToken::new();
let join = tokio::spawn(worker.run(cancel.clone()));
tokio::time::sleep(Duration::from_millis(200)).await;
cancel.cancel();
join.await.unwrap().unwrap();
assert_eq!(seen.lock().unwrap().len(), 2);
assert_eq!(
store.delivered.lock().unwrap().as_slice(),
event_ids.as_slice()
);
assert!(store.failed.lock().unwrap().is_empty());
}
#[tokio::test]
async fn worker_marks_failed_when_handler_errors() {
let envelope = fresh_envelope(Uuid::from_u128(1));
let event_id = envelope.event_id;
let store = MockStore::new(vec![envelope]);
let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(FailingHandler));
let registry = registry_with(vec![handler]);
let worker = OutboxWorker::new(store.clone(), registry, OutboxWorkerConfig::default());
let cancel = CancellationToken::new();
let join = tokio::spawn(worker.run(cancel.clone()));
tokio::time::sleep(Duration::from_millis(200)).await;
cancel.cancel();
join.await.unwrap().unwrap();
assert!(store.delivered.lock().unwrap().is_empty());
let failed = store.failed.lock().unwrap();
assert_eq!(failed.len(), 1);
assert_eq!(failed[0].0, event_id);
assert!(failed[0].1.contains("forced"));
}
#[tokio::test]
async fn worker_marks_failed_when_no_handler_registered() {
let envelope = fresh_envelope(Uuid::from_u128(1));
let event_id = envelope.event_id;
let store = MockStore::new(vec![envelope]);
let registry = HashMap::new();
let worker = OutboxWorker::new(store.clone(), registry, OutboxWorkerConfig::default());
let cancel = CancellationToken::new();
let join = tokio::spawn(worker.run(cancel.clone()));
tokio::time::sleep(Duration::from_millis(200)).await;
cancel.cancel();
join.await.unwrap().unwrap();
let failed = store.failed.lock().unwrap();
assert_eq!(failed.len(), 1);
assert_eq!(failed[0].0, event_id);
assert!(failed[0].1.contains("no handler"));
}
#[tokio::test]
async fn worker_dead_letters_envelope_when_max_attempts_exhausted() {
let envelope = fresh_envelope(Uuid::from_u128(1));
let event_id = envelope.event_id;
let store = MockStore::new(vec![envelope]);
let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(FailingHandler));
let registry = registry_with(vec![handler]);
let config = OutboxWorkerConfig {
max_attempts: 1,
batch_size: 1,
..OutboxWorkerConfig::default()
};
let worker = OutboxWorker::new(store.clone(), registry, config);
let cancel = CancellationToken::new();
let join = tokio::spawn(worker.run(cancel.clone()));
tokio::time::sleep(Duration::from_millis(200)).await;
cancel.cancel();
join.await.unwrap().unwrap();
let failed = store.failed.lock().unwrap();
assert_eq!(failed.len(), 1);
assert_eq!(failed[0].0, event_id);
let dead = store.dead_lettered.lock().unwrap();
assert_eq!(dead.as_slice(), &[event_id]);
}
#[tokio::test]
async fn worker_does_not_dead_letter_before_attempts_exhausted() {
let envelope = fresh_envelope(Uuid::from_u128(1));
let event_id = envelope.event_id;
let store = MockStore::new(vec![envelope]);
let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(FailingHandler));
let registry = registry_with(vec![handler]);
let config = OutboxWorkerConfig {
max_attempts: 3,
batch_size: 1,
..OutboxWorkerConfig::default()
};
let worker = OutboxWorker::new(store.clone(), registry, config);
let cancel = CancellationToken::new();
let join = tokio::spawn(worker.run(cancel.clone()));
tokio::time::sleep(Duration::from_millis(200)).await;
cancel.cancel();
join.await.unwrap().unwrap();
let failed = store.failed.lock().unwrap();
assert_eq!(failed.len(), 1);
assert_eq!(failed[0].0, event_id);
assert!(
store.dead_lettered.lock().unwrap().is_empty(),
"should not dead-letter when attempts(1) < max_attempts(3)"
);
}
#[tokio::test]
async fn worker_claims_envelopes_before_dispatch() {
let envelopes = vec![
fresh_envelope(Uuid::from_u128(1)),
fresh_envelope(Uuid::from_u128(2)),
];
let expected_ids: Vec<Uuid> = envelopes.iter().map(|e| e.event_id).collect();
let store = MockStore::new(envelopes);
let seen = Arc::new(Mutex::new(Vec::new()));
let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(RecordingHandler {
seen: Arc::clone(&seen),
}));
let registry = registry_with(vec![handler]);
let worker = OutboxWorker::new(store.clone(), registry, OutboxWorkerConfig::default());
let cancel = CancellationToken::new();
let join = tokio::spawn(worker.run(cancel.clone()));
tokio::time::sleep(Duration::from_millis(200)).await;
cancel.cancel();
join.await.unwrap().unwrap();
let claimed = store.claimed.lock().unwrap();
assert_eq!(
claimed.as_slice(),
expected_ids.as_slice(),
"claim must be called with all polled ids"
);
}
#[tokio::test]
async fn worker_does_not_claim_when_batch_is_empty() {
let store = MockStore::new(Vec::new());
let registry = HashMap::new();
let worker = OutboxWorker::new(store.clone(), registry, OutboxWorkerConfig::default());
let cancel = CancellationToken::new();
let join = tokio::spawn(worker.run(cancel.clone()));
tokio::time::sleep(Duration::from_millis(150)).await;
cancel.cancel();
join.await.unwrap().unwrap();
assert!(
store.claimed.lock().unwrap().is_empty(),
"claim must not be called when no envelopes are polled"
);
}
#[tokio::test]
async fn worker_aborts_poll_cycle_when_claim_fails_without_dispatching() {
let envelope = fresh_envelope(Uuid::from_u128(1));
let store = MockStore::new(vec![envelope]);
store.fail_claim.store(true, Ordering::Relaxed);
let seen = Arc::new(Mutex::new(Vec::new()));
let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(RecordingHandler {
seen: Arc::clone(&seen),
}));
let registry = registry_with(vec![handler]);
let worker = OutboxWorker::new(store.clone(), registry, OutboxWorkerConfig::default());
let cancel = CancellationToken::new();
let join = tokio::spawn(worker.run(cancel.clone()));
tokio::time::sleep(Duration::from_millis(200)).await;
cancel.cancel();
join.await.unwrap().unwrap();
assert!(
seen.lock().unwrap().is_empty(),
"handler must not be called when claim fails"
);
assert!(
store.delivered.lock().unwrap().is_empty(),
"envelope must not be marked delivered when claim fails"
);
assert!(
store.claimed.lock().unwrap().is_empty(),
"claim ids must not be recorded when claim returns an error"
);
}
#[tokio::test]
async fn worker_derives_context_ids_from_event_id_stable_across_retries() {
let event_id = Uuid::from_u128(99);
let e1 = OutboxEnvelope::new(
event_id,
&UserRegistered {
user_id: Uuid::from_u128(1),
},
)
.unwrap();
let e2 = OutboxEnvelope::new(
event_id,
&UserRegistered {
user_id: Uuid::from_u128(2),
},
)
.unwrap();
let store = MockStore::new(vec![e1, e2]);
let captured_ids = Arc::new(Mutex::new(Vec::new()));
let handler: Arc<dyn ErasedHandler> =
Arc::new(TypedHandler::new(ContextCapturingHandler {
captured_ids: Arc::clone(&captured_ids),
}));
let registry = registry_with(vec![handler]);
let worker = OutboxWorker::new(store.clone(), registry, OutboxWorkerConfig::default());
let cancel = CancellationToken::new();
let join = tokio::spawn(worker.run(cancel.clone()));
tokio::time::sleep(Duration::from_millis(200)).await;
cancel.cancel();
join.await.unwrap().unwrap();
let ids = captured_ids.lock().unwrap();
assert_eq!(ids.len(), 2, "both envelopes must be dispatched");
assert_eq!(
ids[0], ids[1],
"message_id must be identical across dispatches of the same event_id"
);
assert_eq!(
ids[0],
MessageId::from(event_id),
"message_id must equal MessageId::from(event_id)"
);
}
#[tokio::test]
async fn worker_stops_promptly_on_cancellation() {
let store = MockStore::new(Vec::new());
let registry = HashMap::new();
let worker = OutboxWorker::new(store, registry, OutboxWorkerConfig::default());
let cancel = CancellationToken::new();
let join = tokio::spawn(worker.run(cancel.clone()));
cancel.cancel();
let started = std::time::Instant::now();
join.await.unwrap().unwrap();
assert!(
started.elapsed() < Duration::from_secs(1),
"worker took {:?} to stop",
started.elapsed()
);
}
struct HangingHandler;
impl Handler<UserRegistered> for HangingHandler {
type Error = OutboxError;
async fn handle(
&self,
_event: UserRegistered,
_ctx: &HandlerContext,
) -> Result<(), Self::Error> {
std::future::pending::<()>().await;
unreachable!("hanging handler never resolves")
}
}
#[tokio::test(start_paused = true)]
async fn worker_enforces_dispatch_timeout_on_a_hung_handler() {
let envelope = fresh_envelope(Uuid::from_u128(1));
let event_id = envelope.event_id;
let store = MockStore::new(vec![envelope]);
let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(HangingHandler));
let registry = registry_with(vec![handler]);
let config = OutboxWorkerConfig {
dispatch_timeout: Duration::from_millis(50),
batch_size: 1,
..OutboxWorkerConfig::default()
};
let worker = OutboxWorker::new(store.clone(), registry, config);
let cancel = CancellationToken::new();
let join = tokio::spawn(worker.run(cancel.clone()));
tokio::time::sleep(Duration::from_millis(500)).await;
cancel.cancel();
join.await.unwrap().unwrap();
let failed = store.failed.lock().unwrap();
assert_eq!(failed.len(), 1, "hung handler must be recorded as failed");
assert_eq!(failed[0].0, event_id);
assert!(
failed[0].1.contains("timed out"),
"failure must be the dispatch timeout, got {:?}",
failed[0].1
);
assert!(
store.delivered.lock().unwrap().is_empty(),
"a timed-out envelope must not be marked delivered"
);
}
#[tokio::test]
async fn worker_settles_remaining_batch_when_one_ack_fails() {
let e1 = fresh_envelope(Uuid::from_u128(1));
let e2 = fresh_envelope(Uuid::from_u128(2));
let id1 = e1.event_id;
let id2 = e2.event_id;
let store = MockStore::new(vec![e1, e2]);
*store.fail_mark_delivered_for.lock().unwrap() = Some(id1);
let seen = Arc::new(Mutex::new(Vec::new()));
let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(RecordingHandler {
seen: Arc::clone(&seen),
}));
let registry = registry_with(vec![handler]);
let config = OutboxWorkerConfig {
batch_size: 2,
..OutboxWorkerConfig::default()
};
let worker = OutboxWorker::new(store.clone(), registry, config);
let cancel = CancellationToken::new();
let join = tokio::spawn(worker.run(cancel.clone()));
tokio::time::sleep(Duration::from_millis(200)).await;
cancel.cancel();
join.await.unwrap().unwrap();
let delivered = store.delivered.lock().unwrap();
assert!(
delivered.contains(&id2),
"second envelope must be delivered despite the first ack failing, got {delivered:?}"
);
assert!(
seen.lock().unwrap().len() >= 2,
"both envelopes must have been dispatched to the handler"
);
}
#[tokio::test(start_paused = true)]
async fn worker_observes_cancellation_during_a_long_poll_interval() {
let store = MockStore::new(Vec::new());
let registry = HashMap::new();
let config = OutboxWorkerConfig {
poll_interval: Duration::from_secs(3600),
..OutboxWorkerConfig::default()
};
let worker = OutboxWorker::new(store, registry, config);
let cancel = CancellationToken::new();
let join = tokio::spawn(worker.run(cancel.clone()));
tokio::time::sleep(Duration::from_millis(1)).await;
cancel.cancel();
tokio::time::timeout(Duration::from_secs(1), join)
.await
.expect("worker must stop without waiting out the poll interval")
.unwrap()
.unwrap();
}
#[test]
fn lease_for_scales_with_batch_size() {
let config = OutboxWorkerConfig {
dispatch_timeout: Duration::from_secs(30),
..OutboxWorkerConfig::default()
};
let store = MockStore::new(Vec::new());
let worker = OutboxWorker::new(store, HashMap::new(), config);
assert_eq!(worker.lease_for(1), Duration::from_secs(30));
assert_eq!(worker.lease_for(10), Duration::from_secs(300));
assert_eq!(worker.lease_for(0), Duration::from_secs(30));
}
}