use sea_orm::ConnectionTrait;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use super::super::handler::HandlerResult;
use super::super::strategy::{ProcessContext, ProcessingStrategy};
use super::super::taskward::{Directive, WorkerAction};
use super::super::types::{OutboxError, QueueConfig};
use crate::Db;
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct ProcessorReport {
pub partition_id: i64,
pub messages_processed: u32,
pub handler_result: HandlerResult,
}
#[derive(Debug, Clone)]
pub enum PartitionMode {
Normal,
Degraded {
effective_size: u32,
consecutive_successes: u32,
},
}
impl PartitionMode {
pub(crate) fn effective_batch_size(&self, configured: u32) -> u32 {
match self {
Self::Normal => configured,
Self::Degraded { effective_size, .. } => *effective_size,
}
}
pub(crate) fn transition(
&mut self,
result: &HandlerResult,
configured_batch_size: u32,
processed_count: Option<u32>,
) {
match result {
HandlerResult::Success => match self {
Self::Normal => {}
Self::Degraded {
effective_size,
consecutive_successes,
} => {
*consecutive_successes += 1;
let next = effective_size.saturating_mul(2).min(configured_batch_size);
if next >= configured_batch_size {
*self = Self::Normal;
} else {
*effective_size = next;
}
}
},
HandlerResult::Retry { .. } | HandlerResult::Reject { .. } => {
let degrade_to = processed_count.map_or(1, |pc| pc.max(1));
*self = Self::Degraded {
effective_size: degrade_to,
consecutive_successes: 0,
};
}
}
}
}
pub struct PartitionProcessor<S: ProcessingStrategy> {
strategy: S,
partition_id: i64,
config: QueueConfig,
db: Db,
partition_mode: PartitionMode,
}
impl<S: ProcessingStrategy> PartitionProcessor<S> {
pub fn new(strategy: S, partition_id: i64, config: QueueConfig, db: Db) -> Self {
Self {
strategy,
partition_id,
config,
db,
partition_mode: PartitionMode::Normal,
}
}
pub fn poll_interval(&self) -> std::time::Duration {
self.config.poll_interval
}
}
impl<S: ProcessingStrategy> WorkerAction for PartitionProcessor<S> {
type Payload = ProcessorReport;
type Error = OutboxError;
async fn execute(
&mut self,
cancel: &CancellationToken,
) -> Result<Directive<ProcessorReport>, OutboxError> {
let (backend, dialect) = {
let sea_conn = self.db.sea_internal();
let b = sea_conn.get_database_backend();
(b, super::super::dialect::Dialect::from(b))
};
let effective_size = self
.partition_mode
.effective_batch_size(self.config.msg_batch_size);
let ctx = ProcessContext {
db: &self.db,
backend,
dialect,
partition_id: self.partition_id,
};
let mut effective_config = self.config.clone();
effective_config.msg_batch_size = effective_size;
let child_cancel = cancel.child_token();
let result = self
.strategy
.process(&ctx, &effective_config, child_cancel)
.await?;
if let Some(pr) = result {
let has_more = pr.count >= effective_size;
let clamped_pc = pr.processed_count.map(|pc| pc.min(pr.count));
self.partition_mode.transition(
&pr.handler_result,
self.config.msg_batch_size,
clamped_pc,
);
if pr.count > 0 {
debug!(
partition_id = self.partition_id,
count = pr.count,
mode = ?self.partition_mode,
"partition batch complete"
);
}
let report = ProcessorReport {
partition_id: self.partition_id,
messages_processed: pr.count,
handler_result: pr.handler_result,
};
if has_more {
Ok(Directive::Proceed(report))
} else {
Ok(Directive::Idle(report))
}
} else {
Ok(Directive::Idle(ProcessorReport {
partition_id: self.partition_id,
messages_processed: 0,
handler_result: HandlerResult::Success,
}))
}
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use super::*;
#[test]
fn partition_mode_normal_uses_configured_size() {
let mode = PartitionMode::Normal;
assert_eq!(mode.effective_batch_size(50), 50);
}
#[test]
fn partition_mode_degraded_uses_effective_size() {
let mode = PartitionMode::Degraded {
effective_size: 4,
consecutive_successes: 2,
};
assert_eq!(mode.effective_batch_size(50), 4);
}
#[test]
fn partition_mode_retry_degrades_to_one() {
let mut mode = PartitionMode::Normal;
mode.transition(
&HandlerResult::Retry {
reason: "fail".into(),
},
50,
None, );
assert!(matches!(
mode,
PartitionMode::Degraded {
effective_size: 1,
consecutive_successes: 0,
}
));
}
#[test]
fn partition_mode_success_ramps_up() {
let mut mode = PartitionMode::Degraded {
effective_size: 1,
consecutive_successes: 0,
};
mode.transition(&HandlerResult::Success, 50, None);
assert!(matches!(
mode,
PartitionMode::Degraded {
effective_size: 2,
consecutive_successes: 1,
}
));
mode.transition(&HandlerResult::Success, 50, None);
assert!(matches!(
mode,
PartitionMode::Degraded {
effective_size: 4,
..
}
));
mode.transition(&HandlerResult::Success, 50, None);
assert!(matches!(
mode,
PartitionMode::Degraded {
effective_size: 8,
..
}
));
}
#[test]
fn partition_mode_ramps_up_to_normal() {
let mut mode = PartitionMode::Degraded {
effective_size: 16,
consecutive_successes: 4,
};
mode.transition(&HandlerResult::Success, 32, None);
assert!(matches!(mode, PartitionMode::Normal));
}
#[test]
fn partition_mode_reject_in_normal_degrades() {
let mut mode = PartitionMode::Normal;
mode.transition(
&HandlerResult::Reject {
reason: "bad".into(),
},
50,
None, );
assert!(matches!(
mode,
PartitionMode::Degraded {
effective_size: 1,
consecutive_successes: 0,
}
));
}
#[test]
fn partition_mode_reject_with_processed_count() {
let mut mode = PartitionMode::Normal;
mode.transition(
&HandlerResult::Reject {
reason: "bad".into(),
},
50,
Some(3), );
assert!(matches!(
mode,
PartitionMode::Degraded {
effective_size: 3,
consecutive_successes: 0,
}
));
}
#[test]
fn partition_mode_retry_with_processed_count_zero() {
let mut mode = PartitionMode::Normal;
mode.transition(
&HandlerResult::Retry {
reason: "fail".into(),
},
50,
Some(0), );
assert!(matches!(
mode,
PartitionMode::Degraded {
effective_size: 1,
consecutive_successes: 0,
}
));
}
#[test]
fn partition_mode_success_in_normal_stays_normal() {
let mut mode = PartitionMode::Normal;
mode.transition(&HandlerResult::Success, 50, None);
assert!(matches!(mode, PartitionMode::Normal));
}
#[test]
fn partition_mode_full_recovery_cycle() {
let mut mode = PartitionMode::Normal;
mode.transition(&HandlerResult::Retry { reason: "x".into() }, 8, None);
assert_eq!(mode.effective_batch_size(8), 1);
mode.transition(&HandlerResult::Success, 8, None);
assert_eq!(mode.effective_batch_size(8), 2);
mode.transition(&HandlerResult::Success, 8, None);
assert_eq!(mode.effective_batch_size(8), 4);
mode.transition(&HandlerResult::Success, 8, None);
assert!(matches!(mode, PartitionMode::Normal));
assert_eq!(mode.effective_batch_size(8), 8);
}
}