use std::time::Duration;
use tokio::time::Instant;
use super::handler::OutboxMessage;
#[derive(Debug, Clone)]
pub struct Rejection {
pub index: usize,
pub reason: String,
}
pub struct Batch<'a> {
msgs: &'a [OutboxMessage],
cursor: usize,
processed: u32,
rejections: Vec<Rejection>,
lease_deadline: Instant,
}
impl<'a> Batch<'a> {
pub(crate) fn new(msgs: &'a [OutboxMessage], lease_deadline: Instant) -> Self {
Self {
msgs,
cursor: 0,
processed: 0,
rejections: Vec::new(),
lease_deadline,
}
}
pub fn next_msg(&mut self) -> Option<&OutboxMessage> {
if self.cursor < self.msgs.len() {
let msg = &self.msgs[self.cursor];
self.cursor += 1;
Some(msg)
} else {
None
}
}
pub fn next_chunk(&mut self, n: usize) -> &[OutboxMessage] {
let start = self.cursor;
let end = (start + n).min(self.msgs.len());
self.cursor = end;
&self.msgs[start..end]
}
pub fn ack(&mut self) {
self.processed += 1;
}
pub fn ack_chunk(&mut self, count: u32) {
self.processed += count;
}
pub fn reject(&mut self, reason: String) {
let index = self.cursor.saturating_sub(1);
self.rejections.push(Rejection { index, reason });
self.processed += 1;
}
#[must_use]
pub fn remaining(&self) -> Duration {
self.lease_deadline
.checked_duration_since(Instant::now())
.unwrap_or(Duration::ZERO)
}
#[must_use]
pub fn len(&self) -> usize {
self.msgs.len() - self.cursor
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.cursor >= self.msgs.len()
}
#[must_use]
pub fn processed(&self) -> u32 {
self.processed
}
pub(crate) fn rejections(&self) -> &[Rejection] {
&self.rejections
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use super::*;
use crate::outbox::handler::OutboxMessage;
fn make_msg(seq: i64) -> OutboxMessage {
OutboxMessage {
partition_id: 1,
seq,
payload: vec![],
payload_type: "test".into(),
created_at: chrono::Utc::now(),
attempts: 0,
}
}
#[test]
fn next_iterates_all_messages() {
let msgs: Vec<OutboxMessage> = (1..=3).map(make_msg).collect();
let deadline = Instant::now() + Duration::from_secs(30);
let mut batch = Batch::new(&msgs, deadline);
assert_eq!(batch.len(), 3);
assert!(!batch.is_empty());
assert_eq!(batch.next_msg().unwrap().seq, 1);
assert_eq!(batch.next_msg().unwrap().seq, 2);
assert_eq!(batch.next_msg().unwrap().seq, 3);
assert!(batch.next_msg().is_none());
assert!(batch.is_empty());
}
#[test]
fn next_chunk_returns_correct_slices() {
let msgs: Vec<OutboxMessage> = (1..=7).map(make_msg).collect();
let deadline = Instant::now() + Duration::from_secs(30);
let mut batch = Batch::new(&msgs, deadline);
let chunk1 = batch.next_chunk(3);
assert_eq!(chunk1.len(), 3);
assert_eq!(chunk1[0].seq, 1);
let chunk2 = batch.next_chunk(3);
assert_eq!(chunk2.len(), 3);
assert_eq!(chunk2[0].seq, 4);
let chunk3 = batch.next_chunk(3);
assert_eq!(chunk3.len(), 1); assert_eq!(chunk3[0].seq, 7);
assert!(batch.next_chunk(3).is_empty());
}
#[test]
fn ack_and_ack_chunk_track_progress() {
let msgs: Vec<OutboxMessage> = (1..=5).map(make_msg).collect();
let deadline = Instant::now() + Duration::from_secs(30);
let mut batch = Batch::new(&msgs, deadline);
assert_eq!(batch.processed(), 0);
batch.next_msg();
batch.ack();
assert_eq!(batch.processed(), 1);
batch.next_chunk(3);
batch.ack_chunk(3);
assert_eq!(batch.processed(), 4);
}
#[test]
fn reject_tracks_rejection_and_progress() {
let msgs: Vec<OutboxMessage> = (1..=3).map(make_msg).collect();
let deadline = Instant::now() + Duration::from_secs(30);
let mut batch = Batch::new(&msgs, deadline);
batch.next_msg(); batch.ack();
batch.next_msg(); batch.reject("bad payload".into());
batch.next_msg(); batch.ack();
assert_eq!(batch.processed(), 3);
assert_eq!(batch.rejections().len(), 1);
assert_eq!(batch.rejections()[0].index, 1);
assert_eq!(batch.rejections()[0].reason, "bad payload");
}
#[test]
fn remaining_returns_time_until_deadline() {
let msgs: Vec<OutboxMessage> = vec![];
let deadline = Instant::now() + Duration::from_secs(10);
let batch = Batch::new(&msgs, deadline);
let remaining = batch.remaining();
assert!(remaining > Duration::from_secs(9));
assert!(remaining <= Duration::from_secs(10));
}
#[test]
fn remaining_returns_zero_when_past_deadline() {
let msgs: Vec<OutboxMessage> = vec![];
let deadline = Instant::now(); let batch = Batch::new(&msgs, deadline);
assert_eq!(batch.remaining(), Duration::ZERO);
}
}