#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use sea_orm::{ConnectionTrait, DbBackend, FromQueryResult, Statement};
use tokio_util::sync::CancellationToken;
use super::batch::Batch;
use super::dead_letter::{DeadLetterFilter, DeadLetterScope};
use super::dialect::Dialect;
use super::handler::{
HandlerResult, LeasedHandler, LeasedMessageHandler, MessageResult, OutboxMessage,
PerMessageAdapter, TransactionalHandler, TransactionalMessageHandler,
};
use super::prioritizer::SharedPrioritizer;
use super::strategy::{LeasedStrategy, ProcessContext, ProcessingStrategy, TransactionalStrategy};
use super::taskward::{Directive, WorkerAction};
use super::types::{EnqueueMessage, LeaseConfig, OutboxConfig, SequencerConfig, WorkerTuning};
use super::workers::sequencer::Sequencer;
use super::{Outbox, OutboxError, Partitions};
use crate::migration_runner::run_migrations_for_testing;
use crate::outbox::OutboxMessageId;
use crate::{ConnectOpts, Db, connect_db};
struct TestOutbox {
outbox: Arc<Outbox>,
prioritizer: Arc<SharedPrioritizer>,
}
#[derive(Debug)]
struct ProcessorSnapshot {
processed_seq: i64,
attempts: i16,
last_error: Option<String>,
locked_by: Option<String>,
locked_until: Option<String>,
}
#[derive(Debug)]
struct OutgoingSnapshot {
id: i64,
partition_id: i64,
body_id: i64,
seq: i64,
}
#[derive(Debug)]
struct DeadLetterSnapshot {
id: i64,
partition_id: i64,
seq: i64,
payload: Vec<u8>,
payload_type: String,
last_error: Option<String>,
attempts: i16,
status: String,
completed_at: Option<String>,
deadline: Option<String>,
}
async fn setup_db(name: &str) -> Db {
let url = format!("sqlite:file:{name}?mode=memory&cache=shared");
let opts = ConnectOpts {
max_conns: Some(1),
..Default::default()
};
let db = connect_db(&url, opts).await.expect("connect");
run_migrations_for_testing(&db, super::outbox_migrations())
.await
.expect("migrations");
db
}
async fn make_test_outbox(config: OutboxConfig) -> TestOutbox {
let prioritizer = Arc::new(SharedPrioritizer::new());
let outbox = Arc::new(Outbox::new(config));
outbox
.prioritizer
.write()
.await
.replace(Arc::clone(&prioritizer));
TestOutbox {
outbox,
prioritizer,
}
}
async fn make_default_test_outbox() -> TestOutbox {
make_test_outbox(OutboxConfig::default()).await
}
fn make_shared_prioritizer() -> Arc<SharedPrioritizer> {
Arc::new(SharedPrioritizer::new())
}
fn make_sequencer(t: &TestOutbox, config: SequencerConfig, db: &Db) -> Sequencer {
Sequencer::new(
config,
Arc::clone(&t.outbox),
db.clone(),
Arc::clone(&t.prioritizer),
)
}
async fn enqueue_msgs(
outbox: &Outbox,
db: &Db,
queue: &str,
partition: u32,
payloads: &[&str],
) -> Vec<OutboxMessageId> {
let conn = db.conn().expect("conn");
let mut ids = Vec::with_capacity(payloads.len());
for payload in payloads {
let id = outbox
.enqueue(
&conn,
queue,
partition,
payload.as_bytes().to_vec(),
"text/plain",
)
.await
.expect("enqueue");
ids.push(id);
}
ids
}
async fn run_sequencer_until_idle(seq: &mut Sequencer) {
let cancel = CancellationToken::new();
while let Directive::Proceed(_) = seq.execute(&cancel).await.unwrap() {}
}
async fn run_sequencer_once(t: &TestOutbox, db: &Db) {
let mut seq = make_sequencer(t, SequencerConfig::default(), db);
run_sequencer_until_idle(&mut seq).await;
}
async fn enqueue_and_sequence(
t: &TestOutbox,
db: &Db,
queue: &str,
partition: u32,
payloads: &[&str],
) -> Vec<OutboxMessageId> {
let ids = enqueue_msgs(&t.outbox, db, queue, partition, payloads).await;
run_sequencer_once(t, db).await;
ids
}
async fn simulate_crash(db: &Db, partition_id: i64, lease_secs: i64) {
let conn = db.sea_internal();
conn.execute(Statement::from_sql_and_values(
DbBackend::Sqlite,
"UPDATE modkit_outbox_processor \
SET locked_by = $1, \
locked_until = datetime('now', '+' || $2 || ' seconds'), \
attempts = attempts + 1 \
WHERE partition_id = $3",
["crashed-pod".into(), lease_secs.into(), partition_id.into()],
))
.await
.expect("simulate_crash");
}
async fn expire_lease(db: &Db, partition_id: i64) {
let conn = db.sea_internal();
conn.execute(Statement::from_sql_and_values(
DbBackend::Sqlite,
"UPDATE modkit_outbox_processor \
SET locked_until = datetime('now', '-1 seconds') \
WHERE partition_id = $1",
[partition_id.into()],
))
.await
.expect("expire_lease");
}
async fn count_rows(db: &Db, table: &str) -> i64 {
#[derive(Debug, FromQueryResult)]
struct Count {
cnt: i64,
}
let conn = db.sea_internal();
Count::find_by_statement(Statement::from_string(
DbBackend::Sqlite,
format!("SELECT COUNT(*) AS cnt FROM {table}"),
))
.one(&conn)
.await
.expect("count query")
.expect("count row")
.cnt
}
async fn read_processor_state(db: &Db, partition_id: i64) -> ProcessorSnapshot {
#[derive(Debug, FromQueryResult)]
struct Row {
processed_seq: i64,
attempts: i16,
last_error: Option<String>,
locked_by: Option<String>,
locked_until: Option<String>,
}
let conn = db.sea_internal();
let row = Row::find_by_statement(Statement::from_sql_and_values(
DbBackend::Sqlite,
"SELECT processed_seq, attempts, last_error, locked_by, \
CAST(locked_until AS TEXT) AS locked_until \
FROM modkit_outbox_processor WHERE partition_id = $1",
[partition_id.into()],
))
.one(&conn)
.await
.expect("query")
.expect("processor row");
ProcessorSnapshot {
processed_seq: row.processed_seq,
attempts: row.attempts,
last_error: row.last_error,
locked_by: row.locked_by,
locked_until: row.locked_until,
}
}
async fn read_outgoing(db: &Db, partition_id: i64) -> Vec<OutgoingSnapshot> {
#[derive(Debug, FromQueryResult)]
struct Row {
id: i64,
partition_id: i64,
body_id: i64,
seq: i64,
}
let conn = db.sea_internal();
Row::find_by_statement(Statement::from_sql_and_values(
DbBackend::Sqlite,
"SELECT id, partition_id, body_id, seq \
FROM modkit_outbox_outgoing WHERE partition_id = $1 ORDER BY seq",
[partition_id.into()],
))
.all(&conn)
.await
.expect("query")
.into_iter()
.map(|r| OutgoingSnapshot {
id: r.id,
partition_id: r.partition_id,
body_id: r.body_id,
seq: r.seq,
})
.collect()
}
async fn read_dead_letters(db: &Db) -> Vec<DeadLetterSnapshot> {
#[derive(Debug, FromQueryResult)]
struct Row {
id: i64,
partition_id: i64,
seq: i64,
payload: Vec<u8>,
payload_type: String,
last_error: Option<String>,
attempts: i16,
status: String,
completed_at: Option<String>,
deadline: Option<String>,
}
let conn = db.sea_internal();
Row::find_by_statement(Statement::from_string(
DbBackend::Sqlite,
"SELECT id, partition_id, seq, payload, payload_type, last_error, \
attempts, status, CAST(completed_at AS TEXT) AS completed_at, \
CAST(deadline AS TEXT) AS deadline \
FROM modkit_outbox_dead_letters ORDER BY seq",
))
.all(&conn)
.await
.expect("query")
.into_iter()
.map(|r| DeadLetterSnapshot {
id: r.id,
partition_id: r.partition_id,
seq: r.seq,
payload: r.payload,
payload_type: r.payload_type,
last_error: r.last_error,
attempts: r.attempts,
status: r.status,
completed_at: r.completed_at,
deadline: r.deadline,
})
.collect()
}
async fn read_partition_sequence(db: &Db, partition_id: i64) -> i64 {
#[derive(Debug, FromQueryResult)]
struct Row {
sequence: i64,
}
let conn = db.sea_internal();
Row::find_by_statement(Statement::from_sql_and_values(
DbBackend::Sqlite,
"SELECT sequence FROM modkit_outbox_partitions WHERE id = $1",
[partition_id.into()],
))
.one(&conn)
.await
.expect("query")
.expect("partition row")
.sequence
}
async fn poll_until<F, Fut>(f: F, timeout_ms: u64)
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = bool>,
{
let deadline = tokio::time::Instant::now() + Duration::from_millis(timeout_ms);
loop {
if f().await {
return;
}
assert!(
tokio::time::Instant::now() < deadline,
"poll_until timed out after {timeout_ms}ms"
);
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
struct CountingSuccessHandler {
count: Arc<AtomicU32>,
}
#[async_trait::async_trait]
impl LeasedHandler for CountingSuccessHandler {
async fn handle(&self, batch: &mut Batch<'_>) -> HandlerResult {
while batch.next_msg().is_some() {
self.count.fetch_add(1, Ordering::Relaxed);
batch.ack();
}
HandlerResult::Success
}
}
struct CountingMessageHandler {
count: Arc<AtomicU32>,
}
#[async_trait::async_trait]
impl LeasedMessageHandler for CountingMessageHandler {
async fn handle(&self, _msg: &OutboxMessage) -> MessageResult {
self.count.fetch_add(1, Ordering::Relaxed);
MessageResult::Ok
}
}
struct AlwaysRetryHandler;
#[async_trait::async_trait]
impl LeasedMessageHandler for AlwaysRetryHandler {
async fn handle(&self, _msg: &OutboxMessage) -> MessageResult {
MessageResult::Retry
}
}
struct AlwaysRejectHandler;
#[async_trait::async_trait]
impl LeasedMessageHandler for AlwaysRejectHandler {
async fn handle(&self, _msg: &OutboxMessage) -> MessageResult {
MessageResult::Reject("permanently bad".into())
}
}
struct AttemptsRecorder {
seen_attempts: Arc<Mutex<Vec<i16>>>,
}
#[async_trait::async_trait]
impl LeasedMessageHandler for AttemptsRecorder {
async fn handle(&self, msg: &OutboxMessage) -> MessageResult {
self.seen_attempts.lock().unwrap().push(msg.attempts);
MessageResult::Ok
}
}
struct CountingTxHandler {
count: Arc<AtomicU32>,
}
#[async_trait::async_trait]
impl TransactionalHandler for CountingTxHandler {
async fn handle(&self, _txn: &dyn ConnectionTrait, msgs: &[OutboxMessage]) -> HandlerResult {
#[allow(clippy::cast_possible_truncation)]
self.count.fetch_add(msgs.len() as u32, Ordering::Relaxed);
HandlerResult::Success
}
}
struct AlwaysRetryTxHandler;
#[async_trait::async_trait]
impl TransactionalHandler for AlwaysRetryTxHandler {
async fn handle(&self, _txn: &dyn ConnectionTrait, _msgs: &[OutboxMessage]) -> HandlerResult {
HandlerResult::Retry {
reason: "transient tx failure".into(),
}
}
}
struct AlwaysRejectTxHandler;
#[async_trait::async_trait]
impl TransactionalHandler for AlwaysRejectTxHandler {
async fn handle(&self, _txn: &dyn ConnectionTrait, _msgs: &[OutboxMessage]) -> HandlerResult {
HandlerResult::Reject {
reason: "permanently bad tx".into(),
}
}
}
struct PoisonMessageHandler {
poison_seqs: Vec<i64>,
}
#[async_trait::async_trait]
impl LeasedMessageHandler for PoisonMessageHandler {
async fn handle(&self, msg: &OutboxMessage) -> MessageResult {
if self.poison_seqs.contains(&msg.seq) {
MessageResult::Reject(format!("poison seq={}", msg.seq))
} else {
MessageResult::Ok
}
}
}
#[tokio::test]
async fn registration_creates_partition_and_processor_rows() {
let db = setup_db("ch1_creates_rows").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "orders", 4).await.unwrap();
let part_count = count_rows(&db, "modkit_outbox_partitions").await;
assert_eq!(part_count, 4, "4 partition rows");
let proc_count = count_rows(&db, "modkit_outbox_processor").await;
assert_eq!(proc_count, 4, "4 processor rows");
let ids = t.outbox.all_partition_ids();
for id in &ids {
let snap = read_processor_state(&db, *id).await;
assert_eq!(snap.processed_seq, 0);
assert_eq!(snap.attempts, 0);
}
}
#[tokio::test]
async fn registration_is_idempotent() {
let db = setup_db("ch1_idempotent").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "orders", 4).await.unwrap();
t.outbox.register_queue(&db, "orders", 4).await.unwrap();
let part_count = count_rows(&db, "modkit_outbox_partitions").await;
assert_eq!(part_count, 4, "still exactly 4 - no duplicates");
}
#[tokio::test]
async fn registration_rejects_mismatched_partition_count() {
let db = setup_db("ch1_mismatch").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "orders", 4).await.unwrap();
let err = t.outbox.register_queue(&db, "orders", 2).await.unwrap_err();
assert!(matches!(
err,
OutboxError::PartitionCountMismatch {
expected: 2,
found: 4,
..
}
));
}
#[tokio::test]
async fn registration_multiple_queues_distinct_ids() {
let db = setup_db("ch1_multi_queue").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "a", 2).await.unwrap();
t.outbox.register_queue(&db, "b", 2).await.unwrap();
let all_ids = t.outbox.all_partition_ids();
assert_eq!(all_ids.len(), 4);
let mut deduped = all_ids;
deduped.dedup();
assert_eq!(deduped.len(), 4);
}
#[tokio::test]
async fn registration_partition_to_queue_reverse_lookup() {
let db = setup_db("ch1_reverse_lookup").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "orders", 2).await.unwrap();
let ids = t.outbox.all_partition_ids();
assert_eq!(ids.len(), 2);
for id in &ids {
assert_eq!(t.outbox.partition_to_queue(*id).as_deref(), Some("orders"));
}
}
#[tokio::test]
async fn enqueue_single_creates_body_and_incoming() {
let db = setup_db("ch2_single").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
enqueue_msgs(&t.outbox, &db, "q", 0, &["hello"]).await;
assert_eq!(count_rows(&db, "modkit_outbox_body").await, 1);
assert_eq!(count_rows(&db, "modkit_outbox_incoming").await, 1);
}
#[tokio::test]
async fn enqueue_returns_correct_id() {
let db = setup_db("ch2_correct_id").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let ids = enqueue_msgs(&t.outbox, &db, "q", 0, &["msg"]).await;
assert_eq!(ids.len(), 1);
assert!(ids[0].0 > 0);
}
#[tokio::test]
async fn enqueue_tx_rollback_leaves_no_rows() {
let db = setup_db("ch2_rollback").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let conn = db.sea_internal();
let txn = sea_orm::TransactionTrait::begin(&conn).await.unwrap();
txn.execute(Statement::from_sql_and_values(
DbBackend::Sqlite,
"INSERT INTO modkit_outbox_body (payload, payload_type) VALUES ($1, $2)",
[b"data".to_vec().into(), "text/plain".into()],
))
.await
.unwrap();
txn.rollback().await.unwrap();
assert_eq!(count_rows(&db, "modkit_outbox_body").await, 0);
assert_eq!(count_rows(&db, "modkit_outbox_incoming").await, 0);
}
#[tokio::test]
async fn enqueue_with_standalone_connection() {
let db = setup_db("ch2_standalone").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
enqueue_msgs(&t.outbox, &db, "q", 0, &["standalone"]).await;
assert_eq!(count_rows(&db, "modkit_outbox_body").await, 1);
assert_eq!(count_rows(&db, "modkit_outbox_incoming").await, 1);
}
#[tokio::test]
async fn enqueue_batch_creates_n_items() {
let db = setup_db("ch2_batch_n").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let items: Vec<EnqueueMessage<'_>> = (0..50)
.map(|i| EnqueueMessage {
partition: 0,
payload: format!("msg-{i}").into_bytes(),
payload_type: "text/plain",
})
.collect();
let conn = db.conn().unwrap();
let ids = t.outbox.enqueue_batch(&conn, "q", &items).await.unwrap();
assert_eq!(ids.len(), 50);
assert_eq!(count_rows(&db, "modkit_outbox_body").await, 50);
assert_eq!(count_rows(&db, "modkit_outbox_incoming").await, 50);
}
#[tokio::test]
async fn enqueue_batch_mixed_partitions() {
let db = setup_db("ch2_batch_mixed").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 2).await.unwrap();
let items: Vec<EnqueueMessage<'_>> = vec![
EnqueueMessage {
partition: 0,
payload: b"a".to_vec(),
payload_type: "text/plain",
},
EnqueueMessage {
partition: 1,
payload: b"b".to_vec(),
payload_type: "text/plain",
},
EnqueueMessage {
partition: 0,
payload: b"c".to_vec(),
payload_type: "text/plain",
},
EnqueueMessage {
partition: 1,
payload: b"d".to_vec(),
payload_type: "text/plain",
},
];
let conn = db.conn().unwrap();
let ids = t.outbox.enqueue_batch(&conn, "q", &items).await.unwrap();
assert_eq!(ids.len(), 4);
assert_eq!(count_rows(&db, "modkit_outbox_incoming").await, 4);
}
#[tokio::test]
async fn enqueue_batch_one_invalid_rejects_entire_batch() {
let db = setup_db("ch2_batch_invalid").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let oversized = vec![0u8; 64 * 1024 + 1];
let items: Vec<EnqueueMessage<'_>> = vec![
EnqueueMessage {
partition: 0,
payload: b"ok".to_vec(),
payload_type: "text/plain",
},
EnqueueMessage {
partition: 0,
payload: oversized,
payload_type: "text/plain",
},
];
let conn = db.conn().unwrap();
let err = t
.outbox
.enqueue_batch(&conn, "q", &items)
.await
.unwrap_err();
assert!(matches!(err, OutboxError::PayloadTooLarge { .. }));
assert_eq!(count_rows(&db, "modkit_outbox_body").await, 0);
}
#[tokio::test]
async fn enqueue_empty_batch_returns_empty_vec() {
let db = setup_db("ch2_batch_empty").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let conn = db.conn().unwrap();
let ids = t.outbox.enqueue_batch(&conn, "q", &[]).await.unwrap();
assert!(ids.is_empty());
}
#[tokio::test]
async fn enqueue_batch_over_chunk_size_works() {
let db = setup_db("ch2_batch_chunk").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let items: Vec<EnqueueMessage<'_>> = (0..150)
.map(|i| EnqueueMessage {
partition: 0,
payload: format!("msg-{i}").into_bytes(),
payload_type: "text/plain",
})
.collect();
let conn = db.conn().unwrap();
let ids = t.outbox.enqueue_batch(&conn, "q", &items).await.unwrap();
assert_eq!(ids.len(), 150);
assert_eq!(count_rows(&db, "modkit_outbox_body").await, 150);
assert_eq!(count_rows(&db, "modkit_outbox_incoming").await, 150);
}
#[tokio::test]
async fn enqueue_oversized_payload_rejected() {
let db = setup_db("ch2_oversized").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let oversized = vec![0u8; 64 * 1024 + 1];
let conn = db.conn().unwrap();
let err = t
.outbox
.enqueue(&conn, "q", 0, oversized, "bin")
.await
.unwrap_err();
assert!(matches!(err, OutboxError::PayloadTooLarge { .. }));
}
#[tokio::test]
async fn enqueue_unregistered_queue_rejected() {
let db = setup_db("ch2_unreg").await;
let t = make_default_test_outbox().await;
let conn = db.conn().unwrap();
let err = t
.outbox
.enqueue(&conn, "nonexistent", 0, b"x".to_vec(), "text/plain")
.await
.unwrap_err();
assert!(matches!(err, OutboxError::QueueNotRegistered(_)));
}
#[tokio::test]
async fn enqueue_out_of_range_partition_rejected() {
let db = setup_db("ch2_oor").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 4).await.unwrap();
let conn = db.conn().unwrap();
let err = t
.outbox
.enqueue(&conn, "q", 5, b"x".to_vec(), "text/plain")
.await
.unwrap_err();
assert!(matches!(err, OutboxError::PartitionOutOfRange { .. }));
}
#[tokio::test]
async fn enqueue_transaction_helper_auto_flushes() {
let db = setup_db("ch2_tx_flush").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let notified = t.prioritizer.notifier();
let notified = notified.notified();
let (_db, result) = t
.outbox
.transaction(db, |tx| {
let outbox = Arc::clone(&t.outbox);
Box::pin(async move {
outbox
.enqueue(tx, "q", 0, b"hello".to_vec(), "text/plain")
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
Ok(())
})
})
.await;
result.unwrap();
tokio::time::timeout(Duration::from_millis(100), notified)
.await
.expect("sequencer should be notified on successful transaction");
}
#[tokio::test]
async fn enqueue_transaction_helper_no_flush_on_rollback() {
let db = setup_db("ch2_tx_no_flush").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let (_db, result) = t
.outbox
.transaction(db, |_tx| {
Box::pin(async move { Err::<(), _>(anyhow::anyhow!("rollback")) })
})
.await;
assert!(result.is_err());
let notifier = t.prioritizer.notifier();
let notified_fut = notifier.notified();
let timed_out = tokio::time::timeout(Duration::from_millis(50), notified_fut)
.await
.is_err();
assert!(timed_out, "sequencer should NOT be notified on rollback");
}
#[tokio::test]
async fn sequencer_moves_incoming_to_outgoing() {
let db = setup_db("ch3_moves").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
enqueue_msgs(&t.outbox, &db, "q", 0, &["a", "b", "c"]).await;
assert_eq!(count_rows(&db, "modkit_outbox_incoming").await, 3);
run_sequencer_once(&t, &db).await;
assert_eq!(count_rows(&db, "modkit_outbox_incoming").await, 0);
assert_eq!(count_rows(&db, "modkit_outbox_outgoing").await, 3);
let pid = t.outbox.all_partition_ids()[0];
let outgoing = read_outgoing(&db, pid).await;
let seqs: Vec<i64> = outgoing.iter().map(|r| r.seq).collect();
assert_eq!(seqs, vec![1, 2, 3]);
for row in &outgoing {
assert_eq!(row.partition_id, pid);
assert!(row.id > 0);
assert!(row.body_id > 0);
}
let ids: Vec<i64> = outgoing.iter().map(|r| r.id).collect();
assert_eq!(ids.len(), 3);
assert!(ids[0] != ids[1] && ids[1] != ids[2]);
}
#[tokio::test]
async fn sequencer_preserves_enqueue_order_in_sequences() {
let db = setup_db("ch3_fifo").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let payloads: Vec<String> = (0..8).map(|i| format!("msg-{i}")).collect();
let payload_refs: Vec<&str> = payloads.iter().map(String::as_str).collect();
let enqueue_ids = enqueue_msgs(&t.outbox, &db, "q", 0, &payload_refs).await;
run_sequencer_once(&t, &db).await;
let pid = t.outbox.all_partition_ids()[0];
let outgoing = read_outgoing(&db, pid).await;
let seqs: Vec<i64> = outgoing.iter().map(|r| r.seq).collect();
assert_eq!(seqs, vec![1, 2, 3, 4, 5, 6, 7, 8]);
let body_ids: Vec<i64> = outgoing.iter().map(|r| r.body_id).collect();
for i in 1..body_ids.len() {
assert!(
body_ids[i] > body_ids[i - 1],
"body_id[{i}]={} should be > body_id[{}]={}",
body_ids[i],
i - 1,
body_ids[i - 1]
);
}
assert_eq!(enqueue_ids.len(), 8);
assert_eq!(outgoing.len(), 8);
}
#[tokio::test]
async fn sequencer_updates_partition_sequence_counter() {
let db = setup_db("ch3_seq_counter").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
enqueue_msgs(&t.outbox, &db, "q", 0, &["a", "b", "c"]).await;
run_sequencer_once(&t, &db).await;
let pid = t.outbox.all_partition_ids()[0];
let seq = read_partition_sequence(&db, pid).await;
assert_eq!(seq, 3);
}
#[tokio::test]
async fn sequencer_multi_partition_independent_sequences() {
let db = setup_db("ch3_multi_part").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 2).await.unwrap();
enqueue_msgs(&t.outbox, &db, "q", 0, &["a0", "b0"]).await;
enqueue_msgs(&t.outbox, &db, "q", 1, &["a1", "b1", "c1"]).await;
run_sequencer_once(&t, &db).await;
let ids = t.outbox.all_partition_ids();
let out0 = read_outgoing(&db, ids[0]).await;
let out1 = read_outgoing(&db, ids[1]).await;
let seqs0: Vec<i64> = out0.iter().map(|r| r.seq).collect();
let seqs1: Vec<i64> = out1.iter().map(|r| r.seq).collect();
assert_eq!(seqs0, vec![1, 2]);
assert_eq!(seqs1, vec![1, 2, 3]);
}
#[tokio::test]
async fn sequencer_empty_incoming_returns_zero() {
let db = setup_db("ch3_empty").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let mut seq = make_sequencer(&t, SequencerConfig::default(), &db);
let cancel = CancellationToken::new();
let result = seq.execute(&cancel).await.unwrap();
assert!(matches!(result, Directive::Idle(_)));
}
#[tokio::test]
async fn sequencer_consecutive_batches_contiguous_sequences() {
let db = setup_db("ch3_contiguous").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
enqueue_msgs(&t.outbox, &db, "q", 0, &["a", "b"]).await;
run_sequencer_once(&t, &db).await;
enqueue_msgs(&t.outbox, &db, "q", 0, &["c", "d"]).await;
run_sequencer_once(&t, &db).await;
let pid = t.outbox.all_partition_ids()[0];
let outgoing = read_outgoing(&db, pid).await;
let seqs: Vec<i64> = outgoing.iter().map(|r| r.seq).collect();
assert_eq!(seqs, vec![1, 2, 3, 4]);
}
#[tokio::test]
async fn sequencer_batch_size_limit_enforced() {
let db = setup_db("ch3_batch_limit").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
enqueue_msgs(&t.outbox, &db, "q", 0, &["a", "b", "c", "d", "e"]).await;
let mut seq = make_sequencer(
&t,
SequencerConfig {
batch_size: 2,
max_inner_iterations: 2,
..Default::default()
},
&db,
);
let cancel = CancellationToken::new();
let result = seq.execute(&cancel).await.unwrap();
assert!(matches!(result, Directive::Proceed(_)));
assert_eq!(result.payload().rows_claimed, 4);
let guard = t
.prioritizer
.take()
.expect("partition should be re-dirtied");
guard.processed();
assert_eq!(count_rows(&db, "modkit_outbox_incoming").await, 1);
}
#[tokio::test]
async fn sequencer_saturated_partition_re_dirtied() {
let db = setup_db("ch3_saturated").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
enqueue_msgs(&t.outbox, &db, "q", 0, &["a", "b", "c"]).await;
let mut seq = make_sequencer(
&t,
SequencerConfig {
batch_size: 2,
max_inner_iterations: 1,
..Default::default()
},
&db,
);
let cancel = CancellationToken::new();
let result = seq.execute(&cancel).await.unwrap();
assert!(matches!(result, Directive::Proceed(_)));
let guard = t
.prioritizer
.take()
.expect("partition should be re-dirtied");
guard.processed();
}
#[tokio::test]
async fn sequencer_unsaturated_partition_not_re_dirtied() {
let db = setup_db("ch3_unsaturated").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
enqueue_msgs(&t.outbox, &db, "q", 0, &["a"]).await;
let mut seq = make_sequencer(
&t,
SequencerConfig {
batch_size: 100,
..Default::default()
},
&db,
);
let cancel = CancellationToken::new();
let result = seq.execute(&cancel).await.unwrap();
assert!(matches!(result, Directive::Proceed(_)));
assert!(t.prioritizer.take().is_none());
}
#[tokio::test]
async fn sequencer_skips_empty_partitions() {
let db = setup_db("ch3_skip_empty").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 2).await.unwrap();
enqueue_msgs(&t.outbox, &db, "q", 1, &["only-p1"]).await;
run_sequencer_once(&t, &db).await;
let ids = t.outbox.all_partition_ids();
let out0 = read_outgoing(&db, ids[0]).await;
let out1 = read_outgoing(&db, ids[1]).await;
assert!(out0.is_empty(), "partition 0 should have no outgoing");
assert_eq!(out1.len(), 1, "partition 1 should have 1 outgoing");
}
async fn run_transactional(
db: &Db,
partition_id: i64,
handler: impl TransactionalHandler + 'static,
msg_batch_size: u32,
) -> Option<super::strategy::ProcessResult> {
let conn = db.sea_internal();
let backend = conn.get_database_backend();
let dialect = Dialect::from(backend);
drop(conn);
let strategy = TransactionalStrategy::new(Box::new(handler));
let ctx = ProcessContext {
db,
backend,
dialect,
partition_id,
};
strategy.process(&ctx, msg_batch_size).await.unwrap()
}
#[tokio::test]
async fn transactional_success_advances_cursor() {
let db = setup_db("ch4_tx_success").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["a", "b", "c"]).await;
let count = Arc::new(AtomicU32::new(0));
run_transactional(
&db,
pid,
CountingTxHandler {
count: count.clone(),
},
3,
)
.await;
assert_eq!(count.load(Ordering::Relaxed), 3);
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.processed_seq, 3);
assert_eq!(snap.attempts, 0);
}
#[tokio::test]
async fn transactional_retry_increments_attempts() {
let db = setup_db("ch4_tx_retry").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["msg"]).await;
run_transactional(&db, pid, AlwaysRetryTxHandler, 10).await;
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.processed_seq, 0, "cursor not advanced");
assert_eq!(snap.attempts, 1);
assert_eq!(snap.last_error.as_deref(), Some("transient tx failure"));
}
#[tokio::test]
async fn transactional_reject_creates_dead_letter_and_advances() {
let db = setup_db("ch4_tx_reject").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["poison"]).await;
run_transactional(&db, pid, AlwaysRejectTxHandler, 10).await;
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.processed_seq, 1, "cursor advanced past rejected msg");
let dls = read_dead_letters(&db).await;
assert_eq!(dls.len(), 1);
assert!(dls[0].id > 0);
assert_eq!(dls[0].partition_id, pid);
assert_eq!(dls[0].seq, 1);
assert_eq!(dls[0].last_error.as_deref(), Some("permanently bad tx"));
assert_eq!(dls[0].payload, b"poison");
assert_eq!(dls[0].payload_type, "text/plain");
assert_eq!(dls[0].attempts, 0);
assert_eq!(dls[0].status, "pending");
}
#[tokio::test]
async fn transactional_batch_processes_multiple_in_single_tx() {
let db = setup_db("ch4_tx_batch").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["a", "b", "c"]).await;
let count = Arc::new(AtomicU32::new(0));
run_transactional(
&db,
pid,
CountingTxHandler {
count: count.clone(),
},
3,
)
.await;
assert_eq!(count.load(Ordering::Relaxed), 3);
}
async fn run_leased(
db: &Db,
partition_id: i64,
handler: impl LeasedHandler + 'static,
lease_duration: Duration,
msg_batch_size: u32,
) -> Option<super::strategy::ProcessResult> {
let conn = db.sea_internal();
let backend = conn.get_database_backend();
let dialect = Dialect::from(backend);
drop(conn);
let strategy = LeasedStrategy::new(
Arc::new(handler),
"test-AAAAAA".to_owned(),
LeaseConfig {
duration: lease_duration,
headroom: Duration::from_secs(2),
},
);
let ctx = ProcessContext {
db,
backend,
dialect,
partition_id,
};
strategy.process(&ctx, msg_batch_size).await.unwrap()
}
#[tokio::test]
async fn idle_poll_does_not_ratchet_attempts() {
let db = setup_db("ch5_idle_attempts").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
let count = Arc::new(AtomicU32::new(0));
for _ in 0..5 {
run_leased(
&db,
pid,
CountingSuccessHandler {
count: count.clone(),
},
Duration::from_secs(30),
10,
)
.await;
}
assert_eq!(count.load(Ordering::Relaxed), 0);
let snap = read_processor_state(&db, pid).await;
assert_eq!(
snap.attempts, 0,
"idle poll cycles should not accumulate attempts (crash trace is reset by lease_release)"
);
}
#[tokio::test]
async fn decoupled_success_advances_cursor_and_releases_lease() {
let db = setup_db("ch5_dc_success").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["a", "b"]).await;
let count = Arc::new(AtomicU32::new(0));
run_leased(
&db,
pid,
CountingSuccessHandler {
count: count.clone(),
},
Duration::from_secs(30),
2,
)
.await;
assert_eq!(count.load(Ordering::Relaxed), 2);
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.processed_seq, 2);
assert_eq!(snap.attempts, 0);
assert!(snap.locked_by.is_none(), "lease released");
assert!(snap.locked_until.is_none(), "lease released");
}
#[tokio::test]
async fn decoupled_retry_preserves_cursor_and_releases_lease() {
let db = setup_db("ch5_dc_retry").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["msg"]).await;
run_leased(&db, pid, AlwaysRetryHandler, Duration::from_secs(30), 10).await;
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.processed_seq, 0, "cursor unchanged");
assert_eq!(snap.attempts, 1, "attempts incremented by lease_acquire");
assert_eq!(
snap.last_error.as_deref(),
Some("message handler returned Retry")
);
assert!(snap.locked_by.is_none(), "lease released");
}
#[tokio::test]
async fn decoupled_reject_creates_dead_letter_and_releases_lease() {
let db = setup_db("ch5_dc_reject").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["bad"]).await;
run_leased(&db, pid, AlwaysRejectHandler, Duration::from_secs(30), 10).await;
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.processed_seq, 1, "cursor advanced past rejected");
assert!(snap.locked_by.is_none(), "lease released");
let dls = read_dead_letters(&db).await;
assert_eq!(dls.len(), 1);
assert_eq!(dls[0].last_error.as_deref(), Some("permanently bad"));
}
#[tokio::test]
async fn decoupled_empty_partition_releases_lease() {
let db = setup_db("ch5_dc_empty").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
let count = Arc::new(AtomicU32::new(0));
let result = run_leased(
&db,
pid,
CountingSuccessHandler {
count: count.clone(),
},
Duration::from_secs(30),
10,
)
.await;
assert!(result.is_none(), "no work done");
assert_eq!(count.load(Ordering::Relaxed), 0);
let snap = read_processor_state(&db, pid).await;
assert!(snap.locked_by.is_none(), "lease released after empty");
}
#[tokio::test]
async fn decoupled_empty_partition_does_not_accumulate_attempts() {
let db = setup_db("ch5_dc_empty_attempts").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
for _ in 0..5 {
let count = Arc::new(AtomicU32::new(0));
run_leased(
&db,
pid,
CountingSuccessHandler {
count: count.clone(),
},
Duration::from_secs(30),
10,
)
.await;
assert_eq!(count.load(Ordering::Relaxed), 0);
}
let snap = read_processor_state(&db, pid).await;
assert_eq!(
snap.attempts, 0,
"attempts should be 0 after empty lease cycles, not accumulated"
);
}
#[tokio::test]
async fn decoupled_each_message_adapter_processes_individually() {
let db = setup_db("ch5_dc_each").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["a", "b", "c"]).await;
let count = Arc::new(AtomicU32::new(0));
let handler = CountingMessageHandler {
count: count.clone(),
};
run_leased(&db, pid, handler, Duration::from_secs(30), 3).await;
assert_eq!(count.load(Ordering::Relaxed), 3);
}
#[tokio::test]
async fn crash_leaves_incremented_attempts_in_db() {
let db = setup_db("ch6_crash_trace").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["msg"]).await;
simulate_crash(&db, pid, 300).await;
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.attempts, 1, "crash left incremented attempts");
assert_eq!(snap.processed_seq, 0, "cursor unchanged");
assert!(snap.locked_by.is_some(), "lease still held by crashed pod");
}
#[tokio::test]
async fn recovery_after_crash_sees_nonzero_attempts() {
let db = setup_db("ch6_recovery").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["msg"]).await;
simulate_crash(&db, pid, 300).await;
expire_lease(&db, pid).await;
let seen = Arc::new(Mutex::new(Vec::new()));
let handler = AttemptsRecorder {
seen_attempts: seen.clone(),
};
run_leased(&db, pid, handler, Duration::from_secs(30), 10).await;
{
let recorded = seen.lock().unwrap();
assert_eq!(recorded.len(), 1);
assert_eq!(recorded[0], 1, "handler sees attempts=1 from the crash");
}
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.attempts, 0);
}
#[tokio::test]
async fn multiple_crashes_accumulate_attempts() {
let db = setup_db("ch6_multi_crash").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["msg"]).await;
simulate_crash(&db, pid, 300).await;
expire_lease(&db, pid).await;
simulate_crash(&db, pid, 300).await;
expire_lease(&db, pid).await;
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.attempts, 2, "two crashes accumulated");
}
#[tokio::test]
async fn retry_does_not_double_increment_attempts() {
let db = setup_db("ch6_no_double").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["msg"]).await;
run_leased(&db, pid, AlwaysRetryHandler, Duration::from_secs(30), 10).await;
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.attempts, 1, "not 2 - retry doesn't double-increment");
}
#[tokio::test]
async fn success_after_crash_resets_attempts() {
let db = setup_db("ch6_reset").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["msg"]).await;
simulate_crash(&db, pid, 300).await;
expire_lease(&db, pid).await;
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.attempts, 1);
let count = Arc::new(AtomicU32::new(0));
run_leased(
&db,
pid,
CountingSuccessHandler {
count: count.clone(),
},
Duration::from_secs(30),
10,
)
.await;
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.attempts, 0, "success resets attempts to 0");
assert_eq!(snap.processed_seq, 1);
}
#[tokio::test]
async fn adaptive_batch_isolates_poison_message() {
let db = setup_db("ch7_poison").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["ok1", "poison", "ok3", "ok4"]).await;
let r = run_leased(
&db,
pid,
PoisonMessageHandler {
poison_seqs: vec![2],
},
Duration::from_secs(30),
1,
)
.await;
assert!(matches!(r.unwrap().handler_result, HandlerResult::Success));
let r = run_leased(
&db,
pid,
PoisonMessageHandler {
poison_seqs: vec![2],
},
Duration::from_secs(30),
1,
)
.await;
assert!(matches!(r.unwrap().handler_result, HandlerResult::Success));
let r = run_leased(
&db,
pid,
PoisonMessageHandler {
poison_seqs: vec![2],
},
Duration::from_secs(30),
1,
)
.await;
assert!(matches!(r.unwrap().handler_result, HandlerResult::Success));
let r = run_leased(
&db,
pid,
PoisonMessageHandler {
poison_seqs: vec![2],
},
Duration::from_secs(30),
1,
)
.await;
assert!(matches!(r.unwrap().handler_result, HandlerResult::Success));
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.processed_seq, 4, "all 4 messages processed");
let dls = read_dead_letters(&db).await;
assert_eq!(dls.len(), 1, "only the poison message was dead-lettered");
assert_eq!(dls[0].seq, 2);
}
async fn run_vacuum(db: &Db, partition_id: i64) {
#[derive(Debug, FromQueryResult)]
struct ProcRow {
processed_seq: i64,
}
let conn = db.sea_internal();
let backend = conn.get_database_backend();
let dialect = Dialect::from(backend);
let proc_row = ProcRow::find_by_statement(Statement::from_sql_and_values(
backend,
"SELECT processed_seq FROM modkit_outbox_processor WHERE partition_id = $1",
[partition_id.into()],
))
.one(&conn)
.await
.unwrap()
.unwrap();
if proc_row.processed_seq == 0 {
return;
}
let vacuum_sql = dialect.vacuum_cleanup();
loop {
let rows = conn
.query_all(Statement::from_sql_and_values(
backend,
vacuum_sql.select_outgoing_chunk,
[
partition_id.into(),
proc_row.processed_seq.into(),
10_000i64.into(),
],
))
.await
.unwrap();
if rows.is_empty() {
break;
}
let outgoing_ids: Vec<i64> = rows
.iter()
.filter_map(|r| r.try_get_by_index::<i64>(0).ok())
.collect();
let body_ids: Vec<i64> = rows
.iter()
.filter_map(|r| r.try_get_by_index::<i64>(1).ok())
.collect();
let del_out = dialect.build_delete_outgoing_batch(outgoing_ids.len());
let values: Vec<sea_orm::Value> = outgoing_ids.iter().map(|&id| id.into()).collect();
conn.execute(Statement::from_sql_and_values(backend, &del_out, values))
.await
.unwrap();
if !body_ids.is_empty() {
let del_body = dialect.build_delete_body_batch(body_ids.len());
let values: Vec<sea_orm::Value> = body_ids.iter().map(|&id| id.into()).collect();
conn.execute(Statement::from_sql_and_values(backend, &del_body, values))
.await
.unwrap();
}
}
conn.execute(Statement::from_sql_and_values(
backend,
dialect.reset_vacuum_counter(),
[partition_id.into()],
))
.await
.unwrap();
}
#[tokio::test]
async fn vacuum_deletes_processed_outgoing_and_body_rows() {
let db = setup_db("ch8_vacuum_deletes").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["a", "b", "c"]).await;
let count = Arc::new(AtomicU32::new(0));
run_leased(
&db,
pid,
CountingSuccessHandler {
count: count.clone(),
},
Duration::from_secs(30),
3,
)
.await;
run_vacuum(&db, pid).await;
assert_eq!(count_rows(&db, "modkit_outbox_outgoing").await, 0);
assert_eq!(count_rows(&db, "modkit_outbox_body").await, 0);
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.processed_seq, 3, "cursor preserved");
}
#[tokio::test]
async fn vacuum_skips_when_processed_seq_is_zero() {
let db = setup_db("ch8_vacuum_skip").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["a"]).await;
run_vacuum(&db, pid).await;
assert_eq!(
count_rows(&db, "modkit_outbox_outgoing").await,
1,
"rows preserved"
);
}
#[tokio::test]
async fn vacuum_preserves_unprocessed_rows() {
let db = setup_db("ch8_vacuum_preserves").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["a", "b", "c", "d", "e"]).await;
let count = Arc::new(AtomicU32::new(0));
run_leased(
&db,
pid,
CountingSuccessHandler {
count: count.clone(),
},
Duration::from_secs(30),
3,
)
.await;
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.processed_seq, 3);
run_vacuum(&db, pid).await;
let remaining = read_outgoing(&db, pid).await;
assert_eq!(remaining.len(), 2);
let seqs: Vec<i64> = remaining.iter().map(|r| r.seq).collect();
assert_eq!(seqs, vec![4, 5]);
for row in &remaining {
assert_eq!(row.partition_id, pid);
assert!(row.id > 0);
assert!(row.body_id > 0);
}
}
async fn read_vacuum_counter(db: &Db, partition_id: i64) -> i64 {
#[derive(Debug, FromQueryResult)]
struct Row {
counter: i64,
}
let conn = db.sea_internal();
Row::find_by_statement(Statement::from_sql_and_values(
DbBackend::Sqlite,
"SELECT counter FROM modkit_outbox_vacuum_counter WHERE partition_id = $1",
[partition_id.into()],
))
.one(&conn)
.await
.expect("query")
.expect("vacuum counter row")
.counter
}
async fn set_vacuum_counter(db: &Db, partition_id: i64, value: i64) {
let conn = db.sea_internal();
conn.execute(Statement::from_sql_and_values(
DbBackend::Sqlite,
"UPDATE modkit_outbox_vacuum_counter SET counter = $1 WHERE partition_id = $2",
[value.into(), partition_id.into()],
))
.await
.unwrap();
}
#[tokio::test]
async fn vacuum_counter_bumped_on_processed_seq_advance() {
let db = setup_db("ch8_counter_bump").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
assert_eq!(read_vacuum_counter(&db, pid).await, 0);
enqueue_and_sequence(&t, &db, "q", 0, &["a", "b"]).await;
let count = Arc::new(AtomicU32::new(0));
run_leased(
&db,
pid,
CountingSuccessHandler {
count: count.clone(),
},
Duration::from_secs(30),
2,
)
.await;
assert_eq!(read_vacuum_counter(&db, pid).await, 1);
run_leased(
&db,
pid,
CountingSuccessHandler {
count: count.clone(),
},
Duration::from_secs(30),
2,
)
.await;
assert_eq!(read_vacuum_counter(&db, pid).await, 1);
}
#[tokio::test]
async fn vacuum_counter_preserves_concurrent_bumps() {
let db = setup_db("ch8_counter_concurrent").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["a", "b", "c"]).await;
let count = Arc::new(AtomicU32::new(0));
run_leased(
&db,
pid,
CountingSuccessHandler {
count: count.clone(),
},
Duration::from_secs(30),
3,
)
.await;
assert_eq!(read_vacuum_counter(&db, pid).await, 1);
set_vacuum_counter(&db, pid, 3).await;
run_vacuum(&db, pid).await;
assert_eq!(count_rows(&db, "modkit_outbox_outgoing").await, 0);
assert_eq!(read_vacuum_counter(&db, pid).await, 0);
}
#[tokio::test]
async fn vacuum_stale_counter_reset() {
let db = setup_db("ch8_stale_counter").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["a"]).await;
let count = Arc::new(AtomicU32::new(0));
run_leased(
&db,
pid,
CountingSuccessHandler {
count: count.clone(),
},
Duration::from_secs(30),
10,
)
.await;
run_vacuum(&db, pid).await;
assert_eq!(read_vacuum_counter(&db, pid).await, 0);
assert_eq!(count_rows(&db, "modkit_outbox_outgoing").await, 0);
set_vacuum_counter(&db, pid, 5).await;
run_vacuum(&db, pid).await;
assert_eq!(read_vacuum_counter(&db, pid).await, 0);
}
#[tokio::test]
async fn vacuum_counter_row_created_on_register_queue() {
let db = setup_db("ch8_counter_register").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 2).await.unwrap();
let pids = t.outbox.all_partition_ids();
assert_eq!(pids.len(), 2);
for &pid in &pids {
assert_eq!(read_vacuum_counter(&db, pid).await, 0);
}
t.outbox.register_queue(&db, "q", 2).await.unwrap();
for &pid in &pids {
assert_eq!(read_vacuum_counter(&db, pid).await, 0);
}
}
async fn create_dead_letters(
t: &TestOutbox,
db: &Db,
queue: &str,
partition: u32,
payloads: &[&str],
) {
enqueue_and_sequence(t, db, queue, partition, payloads).await;
let ids = t.outbox.all_partition_ids();
let pid = ids[partition as usize];
run_leased(
db,
pid,
AlwaysRejectHandler,
Duration::from_secs(30),
u32::try_from(payloads.len()).unwrap(),
)
.await;
}
#[tokio::test]
async fn dead_letter_list_returns_correct_fields() {
let db = setup_db("ch9_dl_list").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
create_dead_letters(&t, &db, "q", 0, &["a", "b", "c"]).await;
let items = t
.outbox
.dead_letter_list(&db.conn().unwrap(), &DeadLetterFilter::default())
.await
.unwrap();
assert_eq!(items.len(), 3);
for item in &items {
assert_eq!(item.partition_id, pid);
assert_eq!(item.last_error.as_deref(), Some("permanently bad"));
assert_eq!(item.status, super::dead_letter::DeadLetterStatus::Pending);
assert!(item.completed_at.is_none());
}
}
#[tokio::test]
async fn dead_letter_count_matches_list() {
let db = setup_db("ch9_dl_count").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
create_dead_letters(&t, &db, "q", 0, &["a", "b", "c"]).await;
let count = t
.outbox
.dead_letter_count(&db.conn().unwrap(), &DeadLetterFilter::default())
.await
.unwrap();
assert_eq!(count, 3);
}
#[tokio::test]
async fn dead_letter_replay_claims_and_sets_reprocessing() {
let db = setup_db("ch9_dl_replay").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
create_dead_letters(&t, &db, "q", 0, &["msg"]).await;
let replayed = t
.outbox
.dead_letter_replay(
&db.conn().unwrap(),
&DeadLetterScope::default(),
Duration::from_mins(1),
)
.await
.unwrap();
assert_eq!(replayed.len(), 1);
let dls = read_dead_letters(&db).await;
assert_eq!(dls.len(), 1);
assert_eq!(dls[0].status, "reprocessing");
assert!(dls[0].deadline.is_some());
}
#[tokio::test]
async fn dead_letter_full_replay_roundtrip() {
let db = setup_db("ch9_dl_roundtrip").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
create_dead_letters(&t, &db, "q", 0, &["msg"]).await;
let replayed = t
.outbox
.dead_letter_replay(
&db.conn().unwrap(),
&DeadLetterScope::default(),
Duration::from_mins(1),
)
.await
.unwrap();
assert_eq!(replayed.len(), 1);
let ids: Vec<i64> = replayed.iter().map(|m| m.id).collect();
let resolved = t
.outbox
.dead_letter_resolve(&db.conn().unwrap(), &ids)
.await
.unwrap();
assert_eq!(resolved, 1);
let dls = read_dead_letters(&db).await;
assert_eq!(dls.len(), 1);
assert_eq!(dls[0].status, "resolved");
assert!(dls[0].completed_at.is_some());
}
#[tokio::test]
async fn dead_letter_cleanup_only_terminal() {
let db = setup_db("ch9_dl_cleanup_soft").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
create_dead_letters(&t, &db, "q", 0, &["a", "b"]).await;
let scope_one = DeadLetterScope::default().limit(1);
let replayed = t
.outbox
.dead_letter_replay(&db.conn().unwrap(), &scope_one, Duration::from_mins(1))
.await
.unwrap();
let ids: Vec<i64> = replayed.iter().map(|m| m.id).collect();
t.outbox
.dead_letter_resolve(&db.conn().unwrap(), &ids)
.await
.unwrap();
let deleted = t
.outbox
.dead_letter_cleanup(&db.conn().unwrap(), &DeadLetterScope::default())
.await
.unwrap();
assert_eq!(deleted, 1);
let remaining = t
.outbox
.dead_letter_count(&db.conn().unwrap(), &DeadLetterFilter::default())
.await
.unwrap();
assert_eq!(remaining, 1);
}
#[tokio::test]
async fn dead_letter_discard_then_cleanup_deletes_all() {
let db = setup_db("ch9_dl_discard_cleanup").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
create_dead_letters(&t, &db, "q", 0, &["a", "b", "c"]).await;
let discarded = t
.outbox
.dead_letter_discard(&db.conn().unwrap(), &DeadLetterScope::default())
.await
.unwrap();
assert_eq!(discarded, 3);
let cleaned = t
.outbox
.dead_letter_cleanup(&db.conn().unwrap(), &DeadLetterScope::default())
.await
.unwrap();
assert_eq!(cleaned, 3);
let remaining = t
.outbox
.dead_letter_count(
&db.conn().unwrap(),
&DeadLetterFilter::default().any_status(),
)
.await
.unwrap();
assert_eq!(remaining, 0);
}
#[tokio::test]
async fn dead_letter_filter_by_partition() {
let db = setup_db("ch9_dl_filter_part").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 2).await.unwrap();
let ids = t.outbox.all_partition_ids();
create_dead_letters(&t, &db, "q", 0, &["a0"]).await;
create_dead_letters(&t, &db, "q", 1, &["b1", "b2"]).await;
let filter_p0 = DeadLetterFilter::default().partition(ids[0]);
let items = t
.outbox
.dead_letter_list(&db.conn().unwrap(), &filter_p0)
.await
.unwrap();
assert_eq!(items.len(), 1);
let filter_p1 = DeadLetterFilter::default().partition(ids[1]);
let items = t
.outbox
.dead_letter_list(&db.conn().unwrap(), &filter_p1)
.await
.unwrap();
assert_eq!(items.len(), 2);
}
#[tokio::test]
async fn dead_letter_filter_with_limit() {
let db = setup_db("ch9_dl_filter_limit").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
create_dead_letters(&t, &db, "q", 0, &["a", "b", "c", "d", "e"]).await;
let filter = DeadLetterFilter::default().limit(2);
let items = t
.outbox
.dead_letter_list(&db.conn().unwrap(), &filter)
.await
.unwrap();
assert_eq!(items.len(), 2);
}
#[tokio::test]
async fn graceful_shutdown_completes_current_batch() {
struct SlowHandler {
processed: Arc<AtomicU32>,
entered: Arc<tokio::sync::Notify>,
}
#[async_trait::async_trait]
impl LeasedMessageHandler for SlowHandler {
async fn handle(&self, _msg: &OutboxMessage) -> MessageResult {
self.entered.notify_one();
tokio::time::sleep(Duration::from_millis(50)).await;
self.processed.fetch_add(1, Ordering::SeqCst);
MessageResult::Ok
}
}
let db = setup_db("ch10_graceful_shutdown").await;
let processed = Arc::new(AtomicU32::new(0));
let handler_entered = Arc::new(tokio::sync::Notify::new());
let handle = Outbox::builder(db.clone())
.processor_tuning(
WorkerTuning::processor_default()
.idle_interval(Duration::from_millis(50))
.batch_size(5),
)
.sequencer_tuning(
WorkerTuning::sequencer_default().idle_interval(Duration::from_millis(50)),
)
.queue("q", Partitions::of(1))
.leased(SlowHandler {
processed: processed.clone(),
entered: handler_entered.clone(),
})
.start()
.await
.unwrap();
let outbox = handle.outbox();
let db2 = setup_db("ch10_graceful_shutdown").await;
let conn = db2.conn().unwrap();
for i in 0..3 {
outbox
.enqueue(&conn, "q", 0, format!("msg-{i}").into_bytes(), "text/plain")
.await
.unwrap();
}
outbox.flush();
tokio::time::timeout(Duration::from_secs(5), handler_entered.notified())
.await
.expect("handler should start processing within 5s");
for i in 3..6 {
outbox
.enqueue(&conn, "q", 0, format!("msg-{i}").into_bytes(), "text/plain")
.await
.unwrap();
}
outbox.flush();
poll_until(
|| {
let c = processed.load(Ordering::SeqCst);
async move { c >= 6 }
},
5000,
)
.await;
assert!(
processed.load(Ordering::SeqCst) >= 6,
"all pre-stop messages should be processed: got {}",
processed.load(Ordering::SeqCst)
);
let stop_result = tokio::time::timeout(Duration::from_secs(5), handle.stop()).await;
assert!(stop_result.is_ok(), "stop() should complete within 5s");
let after_stop = processed.load(Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(
processed.load(Ordering::SeqCst),
after_stop,
"no messages should be processed after stop()"
);
}
#[tokio::test]
async fn builder_start_stop_clean() {
let db = setup_db("ch10_start_stop").await;
let count = Arc::new(AtomicU32::new(0));
let handler = CountingMessageHandler {
count: count.clone(),
};
let handle = Outbox::builder(db)
.processor_tuning(
WorkerTuning::processor_default().idle_interval(Duration::from_millis(50)),
)
.sequencer_tuning(
WorkerTuning::sequencer_default().idle_interval(Duration::from_millis(50)),
)
.queue("orders", Partitions::of(1))
.leased(handler)
.start()
.await
.unwrap();
handle.stop().await;
}
#[tokio::test]
async fn builder_partitions_of_all_valid_values() {
for n in [1, 2, 4, 8, 16, 32, 64] {
let p = Partitions::of(n);
assert_eq!(p.count(), n);
}
}
#[tokio::test]
async fn builder_multiple_queues() {
let db = setup_db("ch10_multi_queue").await;
let count_a = Arc::new(AtomicU32::new(0));
let count_b = Arc::new(AtomicU32::new(0));
let handle = Outbox::builder(db)
.processor_tuning(
WorkerTuning::processor_default().idle_interval(Duration::from_millis(50)),
)
.sequencer_tuning(
WorkerTuning::sequencer_default().idle_interval(Duration::from_millis(50)),
)
.queue("a", Partitions::of(1))
.leased(CountingMessageHandler {
count: count_a.clone(),
})
.queue("b", Partitions::of(2))
.leased(CountingMessageHandler {
count: count_b.clone(),
})
.start()
.await
.unwrap();
let outbox = handle.outbox();
let db2 = setup_db("ch10_multi_queue").await;
let conn = db2.conn().unwrap();
outbox
.enqueue(&conn, "a", 0, b"hello-a".to_vec(), "text/plain")
.await
.unwrap();
outbox
.enqueue(&conn, "b", 0, b"hello-b".to_vec(), "text/plain")
.await
.unwrap();
outbox.flush();
poll_until(
|| {
let ca = count_a.load(Ordering::Relaxed);
let cb = count_b.load(Ordering::Relaxed);
async move { ca >= 1 && cb >= 1 }
},
5000,
)
.await;
assert!(count_a.load(Ordering::Relaxed) >= 1);
assert!(count_b.load(Ordering::Relaxed) >= 1);
handle.stop().await;
}
#[tokio::test]
async fn e2e_happy_path_enqueue_through_reap() {
let db = setup_db("ch11_happy").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["a", "b", "c"]).await;
let count = Arc::new(AtomicU32::new(0));
run_leased(
&db,
pid,
CountingSuccessHandler {
count: count.clone(),
},
Duration::from_secs(30),
3,
)
.await;
run_vacuum(&db, pid).await;
assert_eq!(count_rows(&db, "modkit_outbox_incoming").await, 0);
assert_eq!(count_rows(&db, "modkit_outbox_outgoing").await, 0);
assert_eq!(count_rows(&db, "modkit_outbox_body").await, 0);
assert_eq!(count_rows(&db, "modkit_outbox_dead_letters").await, 0);
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.processed_seq, 3);
assert_eq!(snap.attempts, 0);
}
#[tokio::test]
async fn e2e_retry_then_recovery() {
let db = setup_db("ch11_retry").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["msg"]).await;
run_leased(&db, pid, AlwaysRetryHandler, Duration::from_secs(30), 10).await;
expire_lease(&db, pid).await;
run_leased(&db, pid, AlwaysRetryHandler, Duration::from_secs(30), 10).await;
expire_lease(&db, pid).await;
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.processed_seq, 0);
assert_eq!(snap.attempts, 2);
let count = Arc::new(AtomicU32::new(0));
run_leased(
&db,
pid,
CountingSuccessHandler {
count: count.clone(),
},
Duration::from_secs(30),
10,
)
.await;
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.processed_seq, 1);
assert_eq!(snap.attempts, 0, "attempts reset on success");
}
#[tokio::test]
async fn e2e_reject_replay_success() {
let db = setup_db("ch11_reject_replay").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
create_dead_letters(&t, &db, "q", 0, &["msg"]).await;
let replayed = t
.outbox
.dead_letter_replay(
&db.conn().unwrap(),
&DeadLetterScope::default(),
Duration::from_mins(1),
)
.await
.unwrap();
assert_eq!(replayed.len(), 1);
let ids: Vec<i64> = replayed.iter().map(|m| m.id).collect();
t.outbox
.dead_letter_resolve(&db.conn().unwrap(), &ids)
.await
.unwrap();
let dls = read_dead_letters(&db).await;
assert_eq!(dls.len(), 1);
assert_eq!(dls[0].status, "resolved");
assert!(dls[0].completed_at.is_some());
}
#[tokio::test]
async fn e2e_crash_then_recovery() {
let db = setup_db("ch11_crash").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["msg"]).await;
simulate_crash(&db, pid, 300).await;
expire_lease(&db, pid).await;
let seen = Arc::new(Mutex::new(Vec::new()));
let handler = AttemptsRecorder {
seen_attempts: seen.clone(),
};
run_leased(&db, pid, handler, Duration::from_secs(30), 10).await;
{
let recorded = seen.lock().unwrap();
assert_eq!(recorded[0], 1, "handler saw attempts=1 from crash");
}
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.processed_seq, 1);
assert_eq!(snap.attempts, 0, "attempts reset after successful recovery");
}
struct PartialFailureHandler {
seen_seqs: Arc<Mutex<Vec<i64>>>,
poison_seq: i64,
reject: bool, }
#[async_trait::async_trait]
impl LeasedMessageHandler for PartialFailureHandler {
async fn handle(&self, msg: &OutboxMessage) -> MessageResult {
self.seen_seqs.lock().unwrap().push(msg.seq);
if msg.seq == self.poison_seq {
if self.reject {
return MessageResult::Reject(format!("poison seq={}", msg.seq));
}
return MessageResult::Retry;
}
MessageResult::Ok
}
}
struct TxPartialFailureHandler {
seen_seqs: Arc<Mutex<Vec<i64>>>,
poison_seq: i64,
reject: bool,
}
#[async_trait::async_trait]
impl TransactionalMessageHandler for TxPartialFailureHandler {
async fn handle(&self, _txn: &dyn ConnectionTrait, msg: &OutboxMessage) -> HandlerResult {
self.seen_seqs.lock().unwrap().push(msg.seq);
if msg.seq == self.poison_seq {
if self.reject {
return HandlerResult::Reject {
reason: format!("poison seq={}", msg.seq),
};
}
return HandlerResult::Retry {
reason: format!("transient seq={}", msg.seq),
};
}
HandlerResult::Success
}
}
struct BatchRejectHandler;
#[async_trait::async_trait]
impl LeasedHandler for BatchRejectHandler {
async fn handle(&self, _batch: &mut Batch<'_>) -> HandlerResult {
HandlerResult::Reject {
reason: "batch reject".into(),
}
}
}
#[tokio::test]
async fn tx_partial_reject_processed_count_in_result() {
let db = setup_db("ch12_tx_partial_reject_pc").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["a", "b", "c", "d", "e"]).await;
let seen = Arc::new(Mutex::new(Vec::new()));
let handler = PerMessageAdapter::new(TxPartialFailureHandler {
seen_seqs: seen.clone(),
poison_seq: 3, reject: true,
});
let result = run_transactional(&db, pid, handler, 5).await;
let pr = result.expect("should have a result");
assert!(matches!(pr.handler_result, HandlerResult::Reject { .. }));
assert_eq!(pr.processed_count, Some(2));
let dls = read_dead_letters(&db).await;
assert_eq!(dls.len(), 5, "all 5 messages dead-lettered in tx mode");
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.processed_seq, 5);
}
#[tokio::test]
async fn tx_partial_retry_rolls_back_all() {
let db = setup_db("ch12_tx_partial_retry").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["a", "b", "c"]).await;
let seen = Arc::new(Mutex::new(Vec::new()));
let handler = PerMessageAdapter::new(TxPartialFailureHandler {
seen_seqs: seen.clone(),
poison_seq: 2,
reject: false, });
let result = run_transactional(&db, pid, handler, 3).await;
let pr = result.expect("should have a result");
assert!(matches!(pr.handler_result, HandlerResult::Retry { .. }));
assert_eq!(pr.processed_count, Some(1));
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.processed_seq, 0, "cursor unchanged on retry");
let dls = read_dead_letters(&db).await;
assert!(dls.is_empty(), "no dead letters on retry");
}
#[tokio::test]
async fn tx_reject_at_first_msg_processed_count_zero() {
let db = setup_db("ch12_tx_reject_first").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["a", "b"]).await;
let seen = Arc::new(Mutex::new(Vec::new()));
let handler = PerMessageAdapter::new(TxPartialFailureHandler {
seen_seqs: seen.clone(),
poison_seq: 1, reject: true,
});
let result = run_transactional(&db, pid, handler, 2).await;
let pr = result.expect("should have a result");
assert_eq!(pr.processed_count, Some(0));
}
#[tokio::test]
async fn tx_batch_handler_reject_deadletters_all() {
let db = setup_db("ch12_tx_batch_reject").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["a", "b", "c"]).await;
let result = run_transactional(&db, pid, AlwaysRejectTxHandler, 3).await;
let pr = result.expect("should have a result");
assert_eq!(pr.processed_count, None, "batch handler returns None");
let dls = read_dead_letters(&db).await;
assert_eq!(dls.len(), 3, "all dead-lettered");
}
#[tokio::test]
async fn decoupled_partial_reject_deadletters_only_remaining() {
let db = setup_db("ch12_dc_partial_reject").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["a", "b", "c", "d", "e"]).await;
let seen = Arc::new(Mutex::new(Vec::new()));
let handler = PartialFailureHandler {
seen_seqs: seen.clone(),
poison_seq: 3,
reject: true,
};
let result = run_leased(&db, pid, handler, Duration::from_secs(30), 5).await;
let pr = result.expect("should have a result");
assert!(matches!(pr.handler_result, HandlerResult::Success));
let dls = read_dead_letters(&db).await;
assert_eq!(dls.len(), 1, "only poison message dead-lettered");
assert_eq!(dls[0].seq, 3);
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.processed_seq, 5);
}
#[tokio::test]
async fn leased_reject_at_first_deadletters_only_poison() {
let db = setup_db("ch12_dc_reject_first").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["a", "b", "c"]).await;
let seen = Arc::new(Mutex::new(Vec::new()));
let handler = PartialFailureHandler {
seen_seqs: seen.clone(),
poison_seq: 1, reject: true,
};
let result = run_leased(&db, pid, handler, Duration::from_secs(30), 3).await;
let pr = result.expect("should have a result");
assert!(matches!(pr.handler_result, HandlerResult::Success));
let dls = read_dead_letters(&db).await;
assert_eq!(dls.len(), 1, "only poison message dead-lettered");
assert_eq!(dls[0].seq, 1);
let snap = read_processor_state(&db, pid).await;
assert_eq!(snap.processed_seq, 3);
}
#[tokio::test]
async fn decoupled_retry_does_not_advance_cursor() {
let db = setup_db("ch12_dc_retry_no_advance").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["a", "b", "c"]).await;
let seen = Arc::new(Mutex::new(Vec::new()));
let handler = PartialFailureHandler {
seen_seqs: seen.clone(),
poison_seq: 2,
reject: false,
};
let result = run_leased(&db, pid, handler, Duration::from_secs(30), 3).await;
let pr = result.expect("should have a result");
assert!(matches!(pr.handler_result, HandlerResult::Retry { .. }));
let snap = read_processor_state(&db, pid).await;
assert_eq!(
snap.processed_seq, 1,
"cursor advances past processed prefix on retry"
);
let dls = read_dead_letters(&db).await;
assert!(dls.is_empty(), "no dead letters on retry");
}
#[tokio::test]
async fn decoupled_batch_handler_reject_deadletters_all() {
let db = setup_db("ch12_dc_batch_reject").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["a", "b", "c"]).await;
let result = run_leased(&db, pid, BatchRejectHandler, Duration::from_secs(30), 3).await;
let pr = result.expect("should have a result");
assert_eq!(
pr.processed_count,
Some(0),
"batch handler processed nothing"
);
let dls = read_dead_letters(&db).await;
assert_eq!(dls.len(), 3, "all dead-lettered for batch handler");
}
#[tokio::test]
async fn degradation_with_processed_count() {
use super::workers::processor::{PartitionMode, PartitionModeState};
let mut mode = PartitionMode::new();
mode.transition(
&HandlerResult::Reject {
reason: "poison".into(),
},
8,
Some(3),
1, );
assert_eq!(mode.effective_batch_size(8), 3);
mode.transition(&HandlerResult::Success, 8, None, 1);
assert_eq!(mode.effective_batch_size(8), 6);
mode.transition(&HandlerResult::Success, 8, None, 1);
assert!(matches!(mode.state, PartitionModeState::Normal));
}
#[tokio::test]
async fn degradation_batch_handler_falls_back_to_one() {
use super::workers::processor::PartitionMode;
let mut mode = PartitionMode::new();
mode.transition(
&HandlerResult::Reject {
reason: "bad".into(),
},
8,
None,
1, );
assert_eq!(mode.effective_batch_size(8), 1);
}
#[tokio::test]
async fn degradation_processed_count_zero_degrades_to_one() {
use super::workers::processor::PartitionMode;
let mut mode = PartitionMode::new();
mode.transition(
&HandlerResult::Retry {
reason: "fail".into(),
},
8,
Some(0),
1, );
assert_eq!(mode.effective_batch_size(8), 1);
}
#[tokio::test]
async fn batch_size_one_partial_failure_is_noop() {
let db = setup_db("ch12_batch_one_noop").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["a"]).await;
let seen = Arc::new(Mutex::new(Vec::new()));
let handler = PartialFailureHandler {
seen_seqs: seen.clone(),
poison_seq: 1,
reject: true,
};
let result = run_leased(&db, pid, handler, Duration::from_secs(30), 1).await;
let pr = result.expect("should have a result");
assert!(matches!(pr.handler_result, HandlerResult::Success));
let dls = read_dead_letters(&db).await;
assert_eq!(dls.len(), 1, "single message dead-lettered");
}
#[tokio::test]
async fn processed_count_exceeds_batch_is_clamped() {
use super::workers::processor::PartitionMode;
let mut mode = PartitionMode::new();
let clamped = Some(3u32);
mode.transition(&HandlerResult::Reject { reason: "x".into() }, 8, clamped, 1);
assert_eq!(mode.effective_batch_size(8), 3);
}
async fn insert_raw_incoming(db: &Db, partition_id: i64, count: usize) {
let conn = db.sea_internal();
for _ in 0..count {
let body_id = conn
.query_one(Statement::from_string(
DbBackend::Sqlite,
"INSERT INTO modkit_outbox_body (payload, payload_type) VALUES (X'AA', 'raw') RETURNING id",
))
.await
.expect("insert body")
.expect("body row")
.try_get_by_index::<i64>(0)
.expect("body_id");
conn.execute(Statement::from_sql_and_values(
DbBackend::Sqlite,
"INSERT INTO modkit_outbox_incoming (partition_id, body_id) VALUES ($1, $2)",
[partition_id.into(), body_id.into()],
))
.await
.expect("insert incoming");
}
}
#[tokio::test]
async fn dirty_set_populated_after_enqueue() {
let db = setup_db("ch13_dirty_enqueue").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 4).await.unwrap();
enqueue_msgs(&t.outbox, &db, "q", 0, &["a"]).await;
enqueue_msgs(&t.outbox, &db, "q", 2, &["b"]).await;
let ids = t.outbox.all_partition_ids();
let g1 = t
.prioritizer
.take()
.expect("should have first dirty partition");
let g2 = t
.prioritizer
.take()
.expect("should have second dirty partition");
let mut dirty = vec![g1.partition_id(), g2.partition_id()];
dirty.sort_unstable();
g1.processed();
g2.processed();
assert_eq!(dirty, vec![ids[0], ids[2]]);
}
#[tokio::test]
async fn sequencer_processes_only_dirty_partitions() {
let db = setup_db("ch13_only_dirty").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 4).await.unwrap();
let ids = t.outbox.all_partition_ids();
enqueue_msgs(&t.outbox, &db, "q", 1, &["x", "y"]).await;
run_sequencer_once(&t, &db).await;
assert!(read_outgoing(&db, ids[0]).await.is_empty());
assert_eq!(read_outgoing(&db, ids[1]).await.len(), 2);
assert!(read_outgoing(&db, ids[2]).await.is_empty());
assert!(read_outgoing(&db, ids[3]).await.is_empty());
}
#[tokio::test]
async fn poker_discovers_pending_from_incoming_table() {
let db = setup_db("ch13_poker_discover").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 2).await.unwrap();
let ids = t.outbox.all_partition_ids();
insert_raw_incoming(&db, ids[0], 2).await;
insert_raw_incoming(&db, ids[1], 1).await;
assert!(t.prioritizer.take().is_none());
super::workers::reconciler::reconcile_dirty(&t.outbox, &db, &t.prioritizer).await;
let g1 = t
.prioritizer
.take()
.expect("should have first dirty partition");
let g2 = t
.prioritizer
.take()
.expect("should have second dirty partition");
let mut dirty = vec![g1.partition_id(), g2.partition_id()];
dirty.sort_unstable();
g1.processed();
g2.processed();
assert_eq!(dirty, vec![ids[0], ids[1]]);
}
#[tokio::test]
async fn startup_reconciliation_finds_preexisting_incoming() {
let db = setup_db("ch13_startup_recon").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
insert_raw_incoming(&db, pid, 3).await;
super::workers::reconciler::reconcile_dirty(&t.outbox, &db, &t.prioritizer).await;
run_sequencer_once(&t, &db).await;
assert_eq!(count_rows(&db, "modkit_outbox_incoming").await, 0);
assert_eq!(read_outgoing(&db, pid).await.len(), 3);
}
#[tokio::test]
async fn max_inner_iterations_cap_yields_after_limit() {
let db = setup_db("ch13_max_iter").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
enqueue_msgs(
&t.outbox,
&db,
"q",
0,
&["a", "b", "c", "d", "e", "f", "g", "h"],
)
.await;
let config = SequencerConfig {
batch_size: 2,
max_inner_iterations: 3,
..SequencerConfig::default()
};
let mut seq = make_sequencer(&t, config, &db);
let cancel = CancellationToken::new();
let result = seq.execute(&cancel).await.unwrap();
assert!(matches!(result, Directive::Proceed(_)));
let pid = t.outbox.all_partition_ids()[0];
let outgoing = read_outgoing(&db, pid).await;
assert_eq!(outgoing.len(), 6);
assert_eq!(count_rows(&db, "modkit_outbox_incoming").await, 2);
let guard = t
.prioritizer
.take()
.expect("saturated partition should be re-dirtied");
assert_eq!(guard.partition_id(), pid);
guard.processed();
}
#[tokio::test]
async fn execute_processes_one_partition_per_call() {
let db = setup_db("ch13_one_per_call").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 4).await.unwrap();
let ids = t.outbox.all_partition_ids();
for i in 0..4 {
enqueue_msgs(&t.outbox, &db, "q", i, &["msg"]).await;
}
let mut seq = make_sequencer(&t, SequencerConfig::default(), &db);
let cancel = CancellationToken::new();
let result = seq.execute(&cancel).await.unwrap();
assert!(matches!(result, Directive::Proceed(_)));
let mut processed = 0;
for &id in &ids {
if !read_outgoing(&db, id).await.is_empty() {
processed += 1;
}
}
assert_eq!(processed, 1);
run_sequencer_until_idle(&mut seq).await;
processed = 0;
for &id in &ids {
if !read_outgoing(&db, id).await.is_empty() {
processed += 1;
}
}
assert_eq!(processed, 4);
}
#[tokio::test]
async fn prioritizer_lru_fairness_across_cycles() {
let db = setup_db("ch13_lru_fair").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 4).await.unwrap();
let ids = t.outbox.all_partition_ids();
for i in 0..4 {
enqueue_msgs(&t.outbox, &db, "q", i, &["r1"]).await;
}
let mut seq = make_sequencer(&t, SequencerConfig::default(), &db);
run_sequencer_until_idle(&mut seq).await;
let mut total_outgoing = 0;
for &id in &ids {
total_outgoing += read_outgoing(&db, id).await.len();
}
assert_eq!(total_outgoing, 4, "all 4 partitions should be processed");
}
fn make_sequencer_with_shared(
t: &TestOutbox,
config: SequencerConfig,
db: &Db,
shared: Arc<SharedPrioritizer>,
) -> Sequencer {
Sequencer::new(config, Arc::clone(&t.outbox), db.clone(), shared)
}
#[tokio::test]
async fn parallel_sequencers_process_distinct_partitions() {
let db = setup_db("ch14_parallel_distinct").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 4).await.unwrap();
for i in 0..4 {
enqueue_msgs(&t.outbox, &db, "q", i, &["msg"]).await;
}
let shared = make_shared_prioritizer();
let ids = t.outbox.all_partition_ids();
for &id in &ids {
shared.push_dirty(id);
}
let config = SequencerConfig::default();
let mut seq_a = make_sequencer_with_shared(&t, config.clone(), &db, Arc::clone(&shared));
let mut seq_b = make_sequencer_with_shared(&t, config, &db, Arc::clone(&shared));
let cancel = CancellationToken::new();
let r1 = seq_a.execute(&cancel).await.unwrap();
let r2 = seq_b.execute(&cancel).await.unwrap();
assert!(matches!(r1, Directive::Proceed(_)));
assert!(matches!(r2, Directive::Proceed(_)));
let mut processed = 0;
for &id in &ids {
if !read_outgoing(&db, id).await.is_empty() {
processed += 1;
}
}
assert_eq!(processed, 2);
let r3 = seq_a.execute(&cancel).await.unwrap();
let r4 = seq_b.execute(&cancel).await.unwrap();
assert!(matches!(r3, Directive::Proceed(_)));
assert!(matches!(r4, Directive::Proceed(_)));
processed = 0;
for &id in &ids {
if !read_outgoing(&db, id).await.is_empty() {
processed += 1;
}
}
assert_eq!(processed, 4);
let r5 = seq_a.execute(&cancel).await.unwrap();
assert!(matches!(r5, Directive::Idle(_)));
}
#[tokio::test]
async fn parallel_sequencers_no_duplicate_sequences() {
let db = setup_db("ch14_no_dups").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 4).await.unwrap();
for i in 0..4 {
enqueue_msgs(&t.outbox, &db, "q", i, &["a", "b", "c"]).await;
}
let shared = make_shared_prioritizer();
let ids = t.outbox.all_partition_ids();
for &id in &ids {
shared.push_dirty(id);
}
let config = SequencerConfig::default();
let mut seq_a = make_sequencer_with_shared(&t, config.clone(), &db, Arc::clone(&shared));
let mut seq_b = make_sequencer_with_shared(&t, config, &db, Arc::clone(&shared));
let cancel = CancellationToken::new();
loop {
let a = seq_a.execute(&cancel).await.unwrap();
let b = seq_b.execute(&cancel).await.unwrap();
if matches!(a, Directive::Idle(_)) && matches!(b, Directive::Idle(_)) {
break;
}
}
let ids = t.outbox.all_partition_ids();
for &pid in &ids {
let outgoing = read_outgoing(&db, pid).await;
assert_eq!(outgoing.len(), 3, "partition {pid} should have 3 rows");
let seqs: Vec<i64> = outgoing.iter().map(|r| r.seq).collect();
assert_eq!(
seqs,
vec![1, 2, 3],
"partition {pid} should have seqs 1,2,3"
);
}
assert_eq!(count_rows(&db, "modkit_outbox_incoming").await, 0);
}
#[tokio::test]
async fn saturated_partition_fully_drained_across_cycles() {
let db = setup_db("ch14_saturated_cycles").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
let payloads: Vec<&str> = (0..20).map(|_| "x").collect();
enqueue_msgs(&t.outbox, &db, "q", 0, &payloads).await;
let config = SequencerConfig {
batch_size: 3,
max_inner_iterations: 2,
..Default::default()
};
let mut seq = make_sequencer(&t, config, &db);
let cancel = CancellationToken::new();
let r = seq.execute(&cancel).await.unwrap();
assert!(matches!(r, Directive::Proceed(_)));
assert_eq!(read_outgoing(&db, pid).await.len(), 6);
assert_eq!(count_rows(&db, "modkit_outbox_incoming").await, 14);
run_sequencer_until_idle(&mut seq).await;
assert_eq!(read_outgoing(&db, pid).await.len(), 20);
assert_eq!(count_rows(&db, "modkit_outbox_incoming").await, 0);
let outgoing = read_outgoing(&db, pid).await;
let seqs: Vec<i64> = outgoing.iter().map(|r| r.seq).collect();
assert_eq!(seqs, (1..=20).collect::<Vec<_>>());
}
#[tokio::test]
async fn processor_semaphore_limits_concurrency() {
use super::taskward::{BackoffConfig, Bulkhead, BulkheadConfig, ConcurrencyLimit};
use tokio::sync::Semaphore;
let sem = Arc::new(Semaphore::new(2));
let _p1 = sem.clone().acquire_owned().await.unwrap();
let _p2 = sem.clone().acquire_owned().await.unwrap();
let cancel = CancellationToken::new();
let bulkhead = Bulkhead::new(
"test",
BulkheadConfig {
semaphore: ConcurrencyLimit::Fixed(Arc::clone(&sem)),
backoff: BackoffConfig::new(Duration::from_millis(100), Duration::from_secs(30), 2.0),
},
);
cancel.cancel();
let result = bulkhead.acquire(&cancel).await;
assert!(
result.is_none(),
"should not acquire when all permits taken and cancelled"
);
}
#[tokio::test]
async fn vacuum_counter_decrement_is_idempotent() {
let db = setup_db("ch16_vac_idempotent").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
set_vacuum_counter(&db, pid, 5).await;
assert_eq!(read_vacuum_counter(&db, pid).await, 5);
let conn = db.sea_internal();
let dialect = Dialect::from(conn.get_database_backend());
conn.execute(Statement::from_sql_and_values(
conn.get_database_backend(),
dialect.decrement_vacuum_counter(),
[5i64.into(), pid.into()],
))
.await
.unwrap();
assert_eq!(read_vacuum_counter(&db, pid).await, 0);
conn.execute(Statement::from_sql_and_values(
conn.get_database_backend(),
dialect.decrement_vacuum_counter(),
[5i64.into(), pid.into()],
))
.await
.unwrap();
assert_eq!(read_vacuum_counter(&db, pid).await, 0);
}
#[tokio::test]
async fn vacuum_concurrent_workers_safe() {
use super::workers::vacuum::VacuumTask;
let db = setup_db("ch16_vac_concurrent").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_and_sequence(&t, &db, "q", 0, &["a", "b", "c"]).await;
let conn = db.sea_internal();
conn.execute(Statement::from_sql_and_values(
DbBackend::Sqlite,
"UPDATE modkit_outbox_processor SET processed_seq = 3 WHERE partition_id = $1",
[pid.into()],
))
.await
.unwrap();
set_vacuum_counter(&db, pid, 3).await;
let cancel = CancellationToken::new();
let mut vac1 = VacuumTask::new(db.clone(), 10_000);
let mut vac2 = VacuumTask::new(db.clone(), 10_000);
vac1.execute(&cancel).await.unwrap();
vac2.execute(&cancel).await.unwrap();
assert_eq!(count_rows(&db, "modkit_outbox_outgoing").await, 0);
assert_eq!(count_rows(&db, "modkit_outbox_body").await, 0);
assert_eq!(read_vacuum_counter(&db, pid).await, 0);
}
#[tokio::test]
async fn priority_bulkhead_prefers_shared_when_available() {
use super::taskward::{BackoffConfig, Bulkhead, BulkheadConfig, ConcurrencyLimit};
use tokio::sync::Semaphore;
let guaranteed = Arc::new(Semaphore::new(4));
let shared = Arc::new(Semaphore::new(2));
let cancel = CancellationToken::new();
let bulkhead = Bulkhead::new(
"seq-0",
BulkheadConfig {
semaphore: ConcurrencyLimit::Tiered {
guaranteed: Arc::clone(&guaranteed),
shared: Arc::clone(&shared),
},
backoff: BackoffConfig::new(Duration::from_millis(100), Duration::from_secs(30), 2.0),
},
);
let permit = bulkhead.acquire(&cancel).await;
assert!(permit.is_some(), "should acquire a permit");
assert_eq!(shared.available_permits(), 1);
assert_eq!(guaranteed.available_permits(), 4);
}
#[tokio::test]
async fn priority_bulkhead_falls_back_to_guaranteed_when_shared_exhausted() {
use super::taskward::{BackoffConfig, Bulkhead, BulkheadConfig, ConcurrencyLimit};
use tokio::sync::Semaphore;
let guaranteed = Arc::new(Semaphore::new(4));
let shared = Arc::new(Semaphore::new(2));
let _hold1 = shared.clone().acquire_owned().await.unwrap();
let _hold2 = shared.clone().acquire_owned().await.unwrap();
assert_eq!(shared.available_permits(), 0);
let cancel = CancellationToken::new();
let bulkhead = Bulkhead::new(
"seq-0",
BulkheadConfig {
semaphore: ConcurrencyLimit::Tiered {
guaranteed: Arc::clone(&guaranteed),
shared: Arc::clone(&shared),
},
backoff: BackoffConfig::new(Duration::from_millis(100), Duration::from_secs(30), 2.0),
},
);
let permit = bulkhead.acquire(&cancel).await;
assert!(permit.is_some(), "should acquire guaranteed permit");
assert_eq!(guaranteed.available_permits(), 3);
}
#[tokio::test]
async fn partition_guard_drop_preserves_dirty_signal() {
let t = make_default_test_outbox().await;
t.prioritizer.push_dirty(42);
let guard = t.prioritizer.take().expect("should get guard");
assert_eq!(guard.partition_id(), 42);
drop(guard);
let guard2 = t.prioritizer.take().expect("should get guard after drop");
assert_eq!(guard2.partition_id(), 42);
guard2.processed();
assert!(t.prioritizer.take().is_none());
}
#[tokio::test]
async fn sequencer_processes_across_enqueue_cycles() {
let db = setup_db("ch18_guard_error_retry").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
enqueue_msgs(&t.outbox, &db, "q", 0, &["a"]).await;
let shared = make_shared_prioritizer();
shared.push_dirty(pid);
t.outbox
.prioritizer
.write()
.await
.replace(Arc::clone(&shared));
let config = SequencerConfig::default();
let mut seq = make_sequencer_with_shared(&t, config, &db, Arc::clone(&shared));
let cancel = CancellationToken::new();
let r = seq.execute(&cancel).await.unwrap();
assert!(matches!(r, Directive::Proceed(_)));
assert_eq!(read_outgoing(&db, pid).await.len(), 1);
enqueue_msgs(&t.outbox, &db, "q", 0, &["b"]).await;
let r = seq.execute(&cancel).await.unwrap();
assert!(matches!(r, Directive::Proceed(_)));
assert_eq!(read_outgoing(&db, pid).await.len(), 2);
let outgoing = read_outgoing(&db, pid).await;
let seqs: Vec<i64> = outgoing.iter().map(|r| r.seq).collect();
assert_eq!(seqs, vec![1, 2]);
}
struct CountingHandler {
counter: Arc<AtomicUsize>,
notify: Arc<tokio::sync::Notify>,
}
#[async_trait::async_trait]
impl LeasedHandler for CountingHandler {
async fn handle(&self, batch: &mut Batch<'_>) -> super::handler::HandlerResult {
while batch.next_msg().is_some() {
self.counter.fetch_add(1, Ordering::Relaxed);
batch.ack();
}
self.notify.notify_one();
super::handler::HandlerResult::Success
}
}
#[tokio::test]
async fn pipeline_single_enqueue_one_delivery() {
let db = setup_db("ch19_pipeline_single").await;
let counter = Arc::new(AtomicUsize::new(0));
let notify = Arc::new(tokio::sync::Notify::new());
let handler = CountingHandler {
counter: Arc::clone(&counter),
notify: Arc::clone(¬ify),
};
let handle = Outbox::builder(db.clone())
.processor_tuning(WorkerTuning::processor_default().idle_interval(Duration::from_mins(1)))
.sequencer_tuning(WorkerTuning::sequencer_default().idle_interval(Duration::from_mins(1)))
.processors(1)
.maintenance(1, 1)
.queue("test-q", Partitions::of(1))
.leased(handler)
.start()
.await
.unwrap();
let outbox = handle.outbox();
let (db, result) = outbox
.transaction(db, |tx| {
let o = Arc::clone(outbox);
Box::pin(async move {
o.enqueue(tx, "test-q", 0, b"hello".to_vec(), "test/msg")
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
Ok(())
})
})
.await;
result.unwrap();
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
loop {
if counter.load(Ordering::Acquire) >= 1 {
break;
}
let remaining = deadline
.checked_duration_since(tokio::time::Instant::now())
.unwrap_or(Duration::ZERO);
assert!(
!remaining.is_zero(),
"timed out waiting for consumption (consumed: {})",
counter.load(Ordering::Relaxed)
);
tokio::time::timeout(remaining, notify.notified())
.await
.ok();
}
let baseline = counter.load(Ordering::Relaxed);
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(
counter.load(Ordering::Relaxed),
baseline,
"no duplicate delivery should occur (counter changed during wait)"
);
assert_eq!(
baseline, 1,
"single enqueue must produce exactly one delivery"
);
drop(db);
handle.stop().await;
}
#[tokio::test]
async fn concurrent_enqueue_during_sequencer_preserves_order() {
let db = setup_db("ch20_concurrent_enqueue").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 2).await.unwrap();
enqueue_msgs(&t.outbox, &db, "q", 0, &["a", "b"]).await;
enqueue_msgs(&t.outbox, &db, "q", 1, &["c", "d"]).await;
let ids = t.outbox.all_partition_ids();
for &id in &ids {
t.prioritizer.push_dirty(id);
}
let outbox_clone = Arc::clone(&t.outbox);
let db_clone = db.clone();
let producer = tokio::spawn(async move {
for i in 0..5 {
let payload = format!("bg-{i}");
let conn = db_clone.conn().expect("conn");
outbox_clone
.enqueue(&conn, "q", 0, payload.into_bytes(), "text/plain")
.await
.expect("bg enqueue");
}
});
let mut seq = make_sequencer(&t, SequencerConfig::default(), &db);
run_sequencer_until_idle(&mut seq).await;
producer.await.unwrap();
for &id in &ids {
t.prioritizer.push_dirty(id);
}
run_sequencer_until_idle(&mut seq).await;
for &pid in &ids {
let outgoing = read_outgoing(&db, pid).await;
if outgoing.is_empty() {
continue;
}
let seqs: Vec<i64> = outgoing.iter().map(|r| r.seq).collect();
#[allow(clippy::cast_possible_wrap)]
let expected: Vec<i64> = (1..=seqs.len() as i64).collect();
assert_eq!(
seqs, expected,
"partition {pid} sequences must be contiguous"
);
}
let mut total = 0;
for &pid in &ids {
total += read_outgoing(&db, pid).await.len();
}
assert_eq!(total, 9, "all 9 messages should be in outgoing");
}
#[test]
#[should_panic(expected = "partition count must be a power of 2")]
fn registration_zero_partitions_rejected() {
#[allow(clippy::let_underscore_must_use)]
let _ = Partitions::of(0);
}
#[tokio::test]
async fn sequencer_empty_partition_returns_idle_zero() {
let db = setup_db("ch20_idle_zero").await;
let t = make_default_test_outbox().await;
t.outbox.register_queue(&db, "q", 1).await.unwrap();
let pid = t.outbox.all_partition_ids()[0];
t.prioritizer.push_dirty(pid);
let mut seq = make_sequencer(&t, SequencerConfig::default(), &db);
let cancel = CancellationToken::new();
let result = seq.execute(&cancel).await.unwrap();
assert!(
matches!(result, Directive::Idle(_)),
"empty partition should return Idle"
);
assert_eq!(result.payload().rows_claimed, 0);
}
#[tokio::test]
async fn builder_no_queues_starts_but_enqueue_fails() {
let db = setup_db("ch21_no_queues").await;
let handle = Outbox::builder(db.clone())
.processor_tuning(WorkerTuning::processor_default().idle_interval(Duration::from_mins(1)))
.sequencer_tuning(WorkerTuning::sequencer_default().idle_interval(Duration::from_mins(1)))
.maintenance(1, 1)
.start()
.await
.expect("start with no queues should succeed");
let outbox = handle.outbox();
let conn = db.conn().expect("conn");
let err = outbox
.enqueue(&conn, "nonexistent", 0, b"hello".to_vec(), "text/plain")
.await;
assert!(err.is_err(), "enqueue to unregistered queue should fail");
handle.stop().await;
}
struct MaxBatchSizeHandler {
max_batch_seen: Arc<AtomicU32>,
total_processed: Arc<AtomicUsize>,
notify: Arc<tokio::sync::Notify>,
}
#[async_trait::async_trait]
impl TransactionalHandler for MaxBatchSizeHandler {
async fn handle(
&self,
_txn: &dyn sea_orm::ConnectionTrait,
msgs: &[OutboxMessage],
) -> HandlerResult {
#[allow(clippy::cast_possible_truncation)]
let batch_len = msgs.len() as u32;
self.max_batch_seen.fetch_max(batch_len, Ordering::Relaxed);
self.total_processed
.fetch_add(msgs.len(), Ordering::Relaxed);
self.notify.notify_one();
HandlerResult::Success
}
}
#[tokio::test]
async fn batch_transactional_respects_configured_batch_size() {
let db = setup_db("ch22_batch_txn_batch_size").await;
let max_batch_seen = Arc::new(AtomicU32::new(0));
let total_processed = Arc::new(AtomicUsize::new(0));
let notify = Arc::new(tokio::sync::Notify::new());
let handler = MaxBatchSizeHandler {
max_batch_seen: Arc::clone(&max_batch_seen),
total_processed: Arc::clone(&total_processed),
notify: Arc::clone(¬ify),
};
let handle = Outbox::builder(db.clone())
.processor_tuning(
WorkerTuning::processor_default()
.batch_size(5)
.idle_interval(Duration::from_mins(1)),
)
.sequencer_tuning(WorkerTuning::sequencer_default().idle_interval(Duration::from_mins(1)))
.processors(1)
.maintenance(1, 1)
.queue("batch-q", Partitions::of(1))
.batch_transactional(handler)
.start()
.await
.unwrap();
let outbox = handle.outbox();
let (db, result) = outbox
.transaction(db, |tx| {
let o = Arc::clone(outbox);
Box::pin(async move {
for i in 0..5u8 {
o.enqueue(tx, "batch-q", 0, vec![i], "test/msg")
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
}
Ok(())
})
})
.await;
result.unwrap();
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
loop {
if total_processed.load(Ordering::Acquire) >= 5 {
break;
}
let remaining = deadline
.checked_duration_since(tokio::time::Instant::now())
.unwrap_or(Duration::ZERO);
assert!(
!remaining.is_zero(),
"timed out waiting for consumption (processed: {})",
total_processed.load(Ordering::Relaxed)
);
tokio::time::timeout(remaining, notify.notified())
.await
.ok();
}
let max = max_batch_seen.load(Ordering::Relaxed);
assert!(
max > 1,
"batch_transactional handler should receive batches > 1, but max batch size seen was {max}. \
This proves TransactionalProcessorFactory forces batch_size=1 even for batch handlers."
);
drop(db);
handle.stop().await;
}