use std::time::Duration;
use crate::{Processor, batch_queue::BatchQueue};
use super::{OnAdd, OnFinish, OnFull};
pub(super) fn on_add<P: Processor>(
duration: Duration,
on_full: OnFull,
batch_queue: &BatchQueue<P>,
) -> OnAdd {
if batch_queue.adding_to_new_batch() {
return OnAdd::AddAndProcessAfter(duration);
}
if !batch_queue.last_space_in_batch() {
return OnAdd::Add;
}
match on_full {
OnFull::Process if !batch_queue.at_max_total_processing_capacity() => OnAdd::AddAndProcess,
OnFull::Process | OnFull::Reject => OnAdd::Add,
}
}
pub(super) fn on_finish<P: Processor>(batch_queue: &BatchQueue<P>) -> OnFinish {
if batch_queue.has_next_batch_timeout_expired() || batch_queue.is_next_batch_full() {
OnFinish::ProcessNext
} else {
OnFinish::DoNothing
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use assert_matches::assert_matches;
use tokio::sync::{Notify, mpsc};
use crate::{
Limits,
batch_inner::Generation,
batch_queue::BatchQueue,
error::{ConcurrencyStatus, RejectionReason},
policies::BatchingPolicy,
worker::Message,
};
use super::super::test_utils::*;
use super::*;
#[test]
fn schedules_timeout_when_adding_to_empty() {
let limits = Limits::builder().max_batch_size(2).build();
let queue = BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
let duration = Duration::from_millis(100);
let policy = BatchingPolicy::Duration(duration, OnFull::Process);
let result = policy.on_add(&queue);
assert_matches!(result, OnAdd::AddAndProcessAfter(d) if d == duration);
}
#[test]
fn onfull_reject_rejects_when_full_but_not_processing() {
let limits = Limits::builder()
.max_batch_size(1)
.max_key_concurrency(1)
.max_batch_queue_size(1)
.build();
let mut queue =
BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
queue.push(new_item("key".to_string(), "item1".to_string()));
let policy = BatchingPolicy::Duration(Duration::from_millis(100), OnFull::Reject);
let result = policy.on_add(&queue);
assert_matches!(
result,
OnAdd::Reject(RejectionReason::BatchQueueFull(
ConcurrencyStatus::Available
))
);
}
#[tokio::test]
async fn timeout_while_processing() {
let processor = ControlledProcessor::default();
let limits = Limits::builder()
.max_batch_size(2)
.max_key_concurrency(1)
.build();
let mut queue = BatchQueue::<ControlledProcessor>::new("test".to_string(), (), limits);
let policy = BatchingPolicy::Duration(Duration::from_millis(100), OnFull::Process);
let result = policy.on_add(&queue);
assert_matches!(result, OnAdd::AddAndProcessAfter(_));
let notify1 = Arc::new(Notify::new());
queue.push(new_item((), Arc::clone(¬ify1).notified_owned()));
let result = policy.on_add(&queue);
assert_matches!(result, OnAdd::AddAndProcess); queue.push(new_item((), Arc::clone(¬ify1).notified_owned()));
let (on_finished, mut rx) = mpsc::channel(1);
queue.process_next_ready_batch(processor, on_finished);
let result = policy.on_add(&queue);
assert_matches!(result, OnAdd::AddAndProcessAfter(_)); let notify2 = Arc::new(Notify::new());
queue.push(new_item((), notify2.notified_owned()));
let (tx, mut timeout_rx) = mpsc::channel(1);
queue.process_after(Duration::from_millis(1), tx);
let msg = timeout_rx.recv().await.unwrap(); let second_gen = Generation::default().next();
assert_matches!(msg, Message::TimedOut(_, generation)=> {
assert_eq!(generation, second_gen);
});
let result = policy.on_timeout(second_gen, &queue);
assert_matches!(result, super::super::OnGenerationEvent::DoNothing);
notify1.notify_waiters(); let msg = rx.recv().await.unwrap();
assert_matches!(msg, Message::Finished(_, _));
queue.mark_processed();
let result = policy.on_finish(&queue);
assert_matches!(result, OnFinish::ProcessNext); }
}