use std::sync::atomic::{AtomicUsize, Ordering};
use sea_orm::ConnectionTrait;
use tokio_util::sync::CancellationToken;
pub struct OutboxMessage {
pub partition_id: i64,
pub seq: i64,
pub payload: Vec<u8>,
pub payload_type: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub attempts: i16,
}
#[derive(Debug, Clone)]
pub enum HandlerResult {
Success,
Retry { reason: String },
Reject { reason: String },
}
#[async_trait::async_trait]
pub trait Handler: Send + Sync {
async fn handle(&self, msgs: &[OutboxMessage], cancel: CancellationToken) -> HandlerResult;
fn processed_count(&self) -> Option<usize> {
None
}
}
#[async_trait::async_trait]
pub trait TransactionalHandler: Send + Sync {
async fn handle(
&self,
txn: &dyn ConnectionTrait,
msgs: &[OutboxMessage],
cancel: CancellationToken,
) -> HandlerResult;
fn processed_count(&self) -> Option<usize> {
None
}
}
#[async_trait::async_trait]
pub trait MessageHandler: Send + Sync {
async fn handle(&self, msg: &OutboxMessage, cancel: CancellationToken) -> HandlerResult;
}
#[async_trait::async_trait]
pub trait TransactionalMessageHandler: Send + Sync {
async fn handle(
&self,
txn: &dyn ConnectionTrait,
msg: &OutboxMessage,
cancel: CancellationToken,
) -> HandlerResult;
}
pub struct PerMessageAdapter<H> {
pub handler: H,
processed: AtomicUsize,
}
impl<H> PerMessageAdapter<H> {
pub fn new(handler: H) -> Self {
Self {
handler,
processed: AtomicUsize::new(0),
}
}
}
#[async_trait::async_trait]
impl<H: MessageHandler> Handler for PerMessageAdapter<H> {
async fn handle(&self, msgs: &[OutboxMessage], cancel: CancellationToken) -> HandlerResult {
self.processed.store(0, Ordering::Release);
for msg in msgs {
let result = self.handler.handle(msg, cancel.clone()).await;
if !matches!(result, HandlerResult::Success) {
return result;
}
self.processed.fetch_add(1, Ordering::Release);
}
HandlerResult::Success
}
fn processed_count(&self) -> Option<usize> {
Some(self.processed.load(Ordering::Acquire))
}
}
#[async_trait::async_trait]
impl<H: TransactionalMessageHandler> TransactionalHandler for PerMessageAdapter<H> {
async fn handle(
&self,
txn: &dyn ConnectionTrait,
msgs: &[OutboxMessage],
cancel: CancellationToken,
) -> HandlerResult {
self.processed.store(0, Ordering::Release);
for msg in msgs {
let result = self.handler.handle(txn, msg, cancel.clone()).await;
if !matches!(result, HandlerResult::Success) {
return result;
}
self.processed.fetch_add(1, Ordering::Release);
}
HandlerResult::Success
}
fn processed_count(&self) -> Option<usize> {
Some(self.processed.load(Ordering::Acquire))
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
struct CountingHandler {
count: AtomicU32,
}
impl CountingHandler {
fn new() -> Self {
Self {
count: AtomicU32::new(0),
}
}
}
#[async_trait::async_trait]
impl MessageHandler for CountingHandler {
async fn handle(&self, _msg: &OutboxMessage, _cancel: CancellationToken) -> HandlerResult {
self.count.fetch_add(1, Ordering::Relaxed);
HandlerResult::Success
}
}
struct FailAtHandler {
fail_at: u32,
count: AtomicU32,
reject: bool,
}
#[async_trait::async_trait]
impl MessageHandler for FailAtHandler {
async fn handle(&self, _msg: &OutboxMessage, _cancel: CancellationToken) -> HandlerResult {
let n = self.count.fetch_add(1, Ordering::Relaxed);
if n == self.fail_at {
if self.reject {
return HandlerResult::Reject {
reason: "bad".into(),
};
}
return HandlerResult::Retry {
reason: "transient".into(),
};
}
HandlerResult::Success
}
}
fn make_msg(seq: i64) -> OutboxMessage {
OutboxMessage {
partition_id: 1,
seq,
payload: vec![],
payload_type: "test".into(),
created_at: chrono::Utc::now(),
attempts: 0,
}
}
#[tokio::test]
async fn each_message_all_success() {
let handler = PerMessageAdapter::new(CountingHandler::new());
let msgs: Vec<OutboxMessage> = (1..=5).map(make_msg).collect();
let cancel = CancellationToken::new();
let result = Handler::handle(&handler, &msgs, cancel).await;
assert!(matches!(result, HandlerResult::Success));
assert_eq!(handler.handler.count.load(Ordering::Relaxed), 5);
}
#[tokio::test]
async fn each_message_stops_on_retry() {
let handler = PerMessageAdapter::new(FailAtHandler {
fail_at: 2,
count: AtomicU32::new(0),
reject: false,
});
let msgs: Vec<OutboxMessage> = (1..=5).map(make_msg).collect();
let cancel = CancellationToken::new();
let result = Handler::handle(&handler, &msgs, cancel).await;
assert!(matches!(result, HandlerResult::Retry { .. }));
assert_eq!(handler.handler.count.load(Ordering::Relaxed), 3);
}
#[tokio::test]
async fn each_message_stops_on_reject() {
let handler = PerMessageAdapter::new(FailAtHandler {
fail_at: 1,
count: AtomicU32::new(0),
reject: true,
});
let msgs: Vec<OutboxMessage> = (1..=5).map(make_msg).collect();
let cancel = CancellationToken::new();
let result = Handler::handle(&handler, &msgs, cancel).await;
assert!(matches!(result, HandlerResult::Reject { .. }));
assert_eq!(handler.handler.count.load(Ordering::Relaxed), 2);
}
#[tokio::test]
async fn each_message_empty_batch() {
let handler = PerMessageAdapter::new(CountingHandler::new());
let cancel = CancellationToken::new();
let result = Handler::handle(&handler, &[], cancel).await;
assert!(matches!(result, HandlerResult::Success));
assert_eq!(handler.handler.count.load(Ordering::Relaxed), 0);
}
#[tokio::test]
async fn each_message_reject_at_third_reports_processed_count_2() {
let handler = PerMessageAdapter::new(FailAtHandler {
fail_at: 2,
count: AtomicU32::new(0),
reject: true,
});
let msgs: Vec<OutboxMessage> = (1..=5).map(make_msg).collect();
let cancel = CancellationToken::new();
let result = Handler::handle(&handler, &msgs, cancel).await;
assert!(matches!(result, HandlerResult::Reject { .. }));
assert_eq!(Handler::processed_count(&handler), Some(2));
}
#[tokio::test]
async fn each_message_retry_at_first_reports_processed_count_0() {
let handler = PerMessageAdapter::new(FailAtHandler {
fail_at: 0,
count: AtomicU32::new(0),
reject: false,
});
let msgs: Vec<OutboxMessage> = (1..=3).map(make_msg).collect();
let cancel = CancellationToken::new();
let result = Handler::handle(&handler, &msgs, cancel).await;
assert!(matches!(result, HandlerResult::Retry { .. }));
assert_eq!(Handler::processed_count(&handler), Some(0));
}
#[tokio::test]
async fn each_message_all_success_reports_full_count() {
let handler = PerMessageAdapter::new(CountingHandler::new());
let msgs: Vec<OutboxMessage> = (1..=5).map(make_msg).collect();
let cancel = CancellationToken::new();
let result = Handler::handle(&handler, &msgs, cancel).await;
assert!(matches!(result, HandlerResult::Success));
assert_eq!(Handler::processed_count(&handler), Some(5));
}
#[tokio::test]
async fn each_message_empty_batch_reports_zero() {
let handler = PerMessageAdapter::new(CountingHandler::new());
let cancel = CancellationToken::new();
let _result = Handler::handle(&handler, &[], cancel).await;
assert_eq!(Handler::processed_count(&handler), Some(0));
}
#[tokio::test]
async fn batch_handler_returns_none_processed_count() {
struct BatchHandler;
#[async_trait::async_trait]
impl Handler for BatchHandler {
async fn handle(
&self,
_msgs: &[OutboxMessage],
_cancel: CancellationToken,
) -> HandlerResult {
HandlerResult::Success
}
}
let handler = BatchHandler;
assert_eq!(Handler::processed_count(&handler), None);
}
}