use crate::{Processor, batch_queue::BatchQueue};
use super::{OnAdd, OnFinish};
pub(super) fn on_add<P: Processor>(min_size_hint: usize, batch_queue: &BatchQueue<P>) -> OnAdd {
if batch_queue.at_max_total_processing_capacity() {
return OnAdd::Add;
}
if batch_queue.adding_to_new_batch() && !batch_queue.is_processing() {
return OnAdd::AddAndAcquireResources;
}
if batch_queue.has_last_batch_reached_size(min_size_hint.saturating_sub(1))
&& !batch_queue.is_last_batch_acquiring_resources()
{
return OnAdd::AddAndAcquireResources;
}
OnAdd::Add
}
pub(super) fn on_finish<P: Processor>(batch_queue: &BatchQueue<P>) -> OnFinish {
if batch_queue.has_batch_ready() {
OnFinish::ProcessNextReady
} else {
OnFinish::DoNothing
}
}
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use crate::{
Limits,
batch_queue::BatchQueue,
error::{ConcurrencyStatus, RejectionReason},
policies::BatchingPolicy,
};
use super::super::test_utils::*;
use super::*;
#[test]
fn acquires_resources_when_nothing_processing() {
let limits = Limits::builder()
.max_batch_size(10)
.max_key_concurrency(1)
.build();
let queue = BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
let policy = BatchingPolicy::Balanced { min_size_hint: 5 };
let result = policy.on_add(&queue);
assert_matches!(result, OnAdd::AddAndAcquireResources);
}
#[tokio::test]
async fn waits_when_below_hint_and_processing() {
let limits = Limits::builder()
.max_batch_size(10)
.max_key_concurrency(1)
.build();
let mut queue =
BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
queue.push(new_item("key".to_string(), "item1".to_string()));
let (on_finished, _rx) = tokio::sync::mpsc::channel(1);
queue.process_next_ready_batch(TestProcessor, on_finished);
let policy = BatchingPolicy::Balanced { min_size_hint: 5 };
let result = policy.on_add(&queue);
assert_matches!(result, OnAdd::Add);
}
#[tokio::test]
async fn acquires_when_reached_hint() {
let limits = Limits::builder()
.max_batch_size(10)
.max_key_concurrency(2)
.build();
let mut queue =
BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
queue.push(new_item("key".to_string(), "item1".to_string()));
let (on_finished, _rx) = tokio::sync::mpsc::channel(1);
queue.process_next_ready_batch(TestProcessor, on_finished);
for i in 2..=5 {
queue.push(new_item("key".to_string(), format!("item{}", i)));
}
let policy = BatchingPolicy::Balanced { min_size_hint: 5 };
let result = policy.on_add(&queue);
assert_matches!(result, OnAdd::AddAndAcquireResources);
}
#[tokio::test]
async fn rejects_at_max_capacity() {
let limits = Limits::builder()
.max_batch_size(5)
.max_key_concurrency(1)
.max_batch_queue_size(1)
.build();
let mut queue =
BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
for i in 1..=5 {
queue.push(new_item("key".to_string(), format!("item{}", i)));
}
let (on_finished, _rx) = tokio::sync::mpsc::channel(1);
queue.process_next_ready_batch(TestProcessor, on_finished);
for i in 6..=10 {
queue.push(new_item("key".to_string(), format!("item{}", i)));
}
let policy = BatchingPolicy::Balanced { min_size_hint: 5 };
let result = policy.on_add(&queue);
assert_matches!(
result,
OnAdd::Reject(RejectionReason::BatchQueueFull(ConcurrencyStatus::MaxedOut))
);
}
#[test]
fn processes_on_finish_when_batch_ready() {
let limits = Limits::builder()
.max_batch_size(10)
.max_key_concurrency(1)
.build();
let mut queue =
BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
for i in 1..=3 {
queue.push(new_item("key".to_string(), format!("item{}", i)));
}
let policy = BatchingPolicy::Balanced { min_size_hint: 5 };
let result = policy.on_finish(&queue);
assert_matches!(result, OnFinish::ProcessNextReady);
}
#[test]
fn does_not_process_on_finish_when_no_batch_ready() {
let limits = Limits::builder().max_batch_size(10).build();
let queue = BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
let policy = BatchingPolicy::Balanced { min_size_hint: 5 };
let result = policy.on_finish(&queue);
assert_matches!(result, OnFinish::DoNothing);
}
}