use std::sync::atomic::{AtomicUsize, Ordering};
use sea_orm::ConnectionTrait;
use super::batch::Batch;
pub struct OutboxMessage {
pub partition_id: i64,
pub seq: i64,
pub payload: Vec<u8>,
pub payload_type: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub attempts: i16,
}
#[derive(Debug, Clone)]
pub enum HandlerResult {
Success,
Retry { reason: String },
Reject { reason: String },
}
#[derive(Debug, Clone)]
pub enum MessageResult {
Ok,
Retry,
Reject(String),
}
#[async_trait::async_trait]
pub trait LeasedHandler: Send + Sync {
async fn handle(&self, batch: &mut Batch<'_>) -> HandlerResult;
}
#[async_trait::async_trait]
pub trait LeasedMessageHandler: Send + Sync {
async fn handle(&self, msg: &OutboxMessage) -> MessageResult;
}
#[async_trait::async_trait]
impl<H: LeasedMessageHandler> LeasedHandler for H {
async fn handle(&self, batch: &mut Batch<'_>) -> HandlerResult {
while let Some(msg) = batch.next_msg() {
match LeasedMessageHandler::handle(self, msg).await {
MessageResult::Ok => batch.ack(),
MessageResult::Retry => {
return HandlerResult::Retry {
reason: "message handler returned Retry".into(),
};
}
MessageResult::Reject(reason) => batch.reject(reason),
}
if batch.remaining().is_zero() {
break;
}
}
HandlerResult::Success
}
}
#[async_trait::async_trait]
pub trait TransactionalHandler: Send + Sync {
async fn handle(&self, txn: &dyn ConnectionTrait, msgs: &[OutboxMessage]) -> HandlerResult;
fn processed_count(&self) -> Option<usize> {
None
}
}
#[async_trait::async_trait]
pub trait TransactionalMessageHandler: Send + Sync {
async fn handle(&self, txn: &dyn ConnectionTrait, msg: &OutboxMessage) -> HandlerResult;
}
pub struct PerMessageAdapter<H> {
pub handler: H,
processed: AtomicUsize,
}
impl<H> PerMessageAdapter<H> {
pub fn new(handler: H) -> Self {
Self {
handler,
processed: AtomicUsize::new(0),
}
}
}
#[async_trait::async_trait]
impl<H: TransactionalMessageHandler> TransactionalHandler for PerMessageAdapter<H> {
async fn handle(&self, txn: &dyn ConnectionTrait, msgs: &[OutboxMessage]) -> HandlerResult {
self.processed.store(0, Ordering::Release);
for msg in msgs {
let result = self.handler.handle(txn, msg).await;
if !matches!(result, HandlerResult::Success) {
return result;
}
self.processed.fetch_add(1, Ordering::Release);
}
HandlerResult::Success
}
fn processed_count(&self) -> Option<usize> {
Some(self.processed.load(Ordering::Acquire))
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
fn make_msg(seq: i64) -> OutboxMessage {
OutboxMessage {
partition_id: 1,
seq,
payload: vec![],
payload_type: "test".into(),
created_at: chrono::Utc::now(),
attempts: 0,
}
}
struct LeasedCountingHandler {
count: AtomicU32,
}
impl LeasedCountingHandler {
fn new() -> Self {
Self {
count: AtomicU32::new(0),
}
}
}
#[async_trait::async_trait]
impl LeasedMessageHandler for LeasedCountingHandler {
async fn handle(&self, _msg: &OutboxMessage) -> MessageResult {
self.count.fetch_add(1, Ordering::Relaxed);
MessageResult::Ok
}
}
struct LeasedFailAtHandler {
fail_at: u32,
count: AtomicU32,
reject: bool,
}
#[async_trait::async_trait]
impl LeasedMessageHandler for LeasedFailAtHandler {
async fn handle(&self, _msg: &OutboxMessage) -> MessageResult {
let n = self.count.fetch_add(1, Ordering::Relaxed);
if n == self.fail_at {
if self.reject {
return MessageResult::Reject("bad".into());
}
return MessageResult::Retry;
}
MessageResult::Ok
}
}
#[tokio::test]
async fn leased_blanket_all_success() {
let handler = LeasedCountingHandler::new();
let msgs: Vec<OutboxMessage> = (1..=5).map(make_msg).collect();
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(30);
let mut batch = Batch::new(&msgs, deadline);
let result = LeasedHandler::handle(&handler, &mut batch).await;
assert!(matches!(result, HandlerResult::Success));
assert_eq!(batch.processed(), 5);
assert_eq!(handler.count.load(Ordering::Relaxed), 5);
}
#[tokio::test]
async fn leased_blanket_retry_at_third() {
let handler = LeasedFailAtHandler {
fail_at: 2,
count: AtomicU32::new(0),
reject: false,
};
let msgs: Vec<OutboxMessage> = (1..=5).map(make_msg).collect();
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(30);
let mut batch = Batch::new(&msgs, deadline);
let result = LeasedHandler::handle(&handler, &mut batch).await;
assert!(matches!(result, HandlerResult::Retry { .. }));
assert_eq!(batch.processed(), 2);
}
#[tokio::test]
async fn leased_blanket_reject_continues() {
let handler = LeasedFailAtHandler {
fail_at: 1,
count: AtomicU32::new(0),
reject: true,
};
let msgs: Vec<OutboxMessage> = (1..=5).map(make_msg).collect();
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(30);
let mut batch = Batch::new(&msgs, deadline);
let result = LeasedHandler::handle(&handler, &mut batch).await;
assert!(matches!(result, HandlerResult::Success));
assert_eq!(batch.processed(), 5);
assert_eq!(batch.rejections().len(), 1);
assert_eq!(batch.rejections()[0].index, 1);
}
}