use std::{
fmt::{self, Debug, Display},
time::Duration,
};
use tracing::warn;
use crate::{
Limits, Processor,
batch_inner::Generation,
batch_queue::BatchQueue,
error::{ConcurrencyStatus, RejectionReason},
};
mod balanced;
mod duration;
mod immediate;
mod size;
#[cfg(test)]
mod test_utils;
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum BatchingPolicy {
Immediate,
Size,
Duration(Duration, OnFull),
Balanced {
min_size_hint: usize,
},
}
#[derive(Debug, Clone, Copy)]
#[non_exhaustive]
pub enum OnFull {
Process,
Reject,
}
#[derive(Debug)]
pub(crate) enum OnAdd {
AddAndProcess,
AddAndAcquireResources,
AddAndProcessAfter(Duration),
Reject(RejectionReason),
Add,
}
#[derive(Debug)]
pub(crate) enum OnGenerationEvent {
Process,
DoNothing,
}
#[derive(Debug)]
pub(crate) enum OnFinish {
ProcessNext,
ProcessNextReady,
DoNothing,
}
impl Display for BatchingPolicy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
BatchingPolicy::Immediate => write!(f, "Immediate"),
BatchingPolicy::Size => write!(f, "Size"),
BatchingPolicy::Duration(duration, on_full) => {
write!(f, "Duration({}ms, {:?})", duration.as_millis(), on_full)
}
BatchingPolicy::Balanced { min_size_hint } => {
write!(f, "Balanced(min_size: {})", min_size_hint)
}
}
}
}
impl BatchingPolicy {
pub(crate) fn normalise(self, limits: Limits) -> Self {
match self {
BatchingPolicy::Balanced { min_size_hint } => BatchingPolicy::Balanced {
min_size_hint: min_size_hint.min(limits.max_batch_size),
},
other => other,
}
}
pub(crate) fn on_add<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> OnAdd {
if let Some(rejection) = self.should_reject(batch_queue) {
return OnAdd::Reject(rejection);
}
match self {
Self::Immediate => immediate::on_add(batch_queue),
Self::Size => size::on_add(batch_queue),
Self::Duration(dur, on_full) => duration::on_add(*dur, *on_full, batch_queue),
Self::Balanced { min_size_hint } => balanced::on_add(*min_size_hint, batch_queue),
}
}
fn should_reject<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> Option<RejectionReason> {
if batch_queue.is_full() {
if batch_queue.at_max_total_processing_capacity() {
Some(RejectionReason::BatchQueueFull(ConcurrencyStatus::MaxedOut))
} else {
Some(RejectionReason::BatchQueueFull(
ConcurrencyStatus::Available,
))
}
} else {
None
}
}
pub(crate) fn on_timeout<P: Processor>(
&self,
generation: Generation,
batch_queue: &BatchQueue<P>,
) -> OnGenerationEvent {
if batch_queue.at_max_total_processing_capacity() {
OnGenerationEvent::DoNothing
} else {
Self::process_generation_if_ready(generation, batch_queue)
}
}
pub(crate) fn on_resources_acquired<P: Processor>(
&self,
generation: Generation,
batch_queue: &BatchQueue<P>,
) -> OnGenerationEvent {
if batch_queue.at_max_total_processing_capacity() {
warn!("on_resources_acquired called when at max processing capacity");
debug_assert!(
false,
"on_resources_acquired called when at max processing capacity"
);
OnGenerationEvent::DoNothing
} else {
Self::process_generation_if_ready(generation, batch_queue)
}
}
pub(crate) fn on_finish<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> OnFinish {
if batch_queue.at_max_total_processing_capacity() {
warn!("on_finish called when at max processing capacity");
debug_assert!(false, "on_finish called when at max processing capacity");
return OnFinish::DoNothing;
}
match self {
BatchingPolicy::Immediate => immediate::on_finish(batch_queue),
BatchingPolicy::Size => size::on_finish(batch_queue),
BatchingPolicy::Duration(_, _) => duration::on_finish(batch_queue),
BatchingPolicy::Balanced { .. } => balanced::on_finish(batch_queue),
}
}
fn process_generation_if_ready<P: Processor>(
generation: Generation,
batch_queue: &BatchQueue<P>,
) -> OnGenerationEvent {
if batch_queue.is_generation_ready(generation) {
OnGenerationEvent::Process
} else {
OnGenerationEvent::DoNothing
}
}
}