use std::{collections::VecDeque, fmt::Debug, time::Duration};
use tokio::sync::mpsc;
use tracing::warn;
use crate::{
BatchError, Limits,
batch::{Batch, BatchItem},
batch_inner::Generation,
processor::Processor,
worker::Message,
};
pub(crate) struct BatchQueue<P: Processor> {
batcher_name: String,
queue: VecDeque<Batch<P>>,
limits: Limits,
pre_acquiring: usize,
processing: usize,
}
impl<P: Processor> BatchQueue<P> {
pub(crate) fn new(batcher_name: String, key: P::Key, limits: Limits) -> Self {
let mut queue = VecDeque::with_capacity(limits.max_batch_queue_size);
let processing = 0;
let pre_acquiring = 0;
queue.push_back(Batch::new(batcher_name.clone(), key));
Self {
batcher_name,
queue,
limits,
pre_acquiring,
processing,
}
}
pub(crate) fn is_next_batch_full(&self) -> bool {
let next = self.queue.front().expect("Should always be non-empty");
next.is_full(self.limits.max_batch_size)
}
pub(crate) fn has_last_batch_reached_size(&self, size: usize) -> bool {
let last = self.queue.back().expect("Should always be non-empty");
last.len() >= size
}
pub(crate) fn is_last_batch_acquiring_resources(&self) -> bool {
let last = self.queue.back().expect("Should always be non-empty");
last.has_started_acquiring()
}
pub(crate) fn has_next_batch_timeout_expired(&self) -> bool {
let next = self.queue.front().expect("Should always be non-empty");
next.has_timeout_expired()
}
pub(crate) fn is_full(&self) -> bool {
let back = self.queue.back().expect("Should always be non-empty");
self.queue.len() >= self.limits.max_batch_queue_size
&& back.len() >= self.limits.max_batch_size
}
pub(crate) fn is_empty(&self) -> bool {
self.queue.len() == 1
&& self
.queue
.front()
.expect("Should always be non-empty")
.is_empty()
}
pub(crate) fn last_space_in_batch(&self) -> bool {
let back = self.queue.back().expect("Should always be non-empty");
back.has_single_space(self.limits.max_batch_size)
}
pub(crate) fn adding_to_new_batch(&self) -> bool {
let back = self.queue.back().expect("Should always be non-empty");
back.is_new_batch() || back.is_full(self.limits.max_batch_size)
}
pub(crate) fn is_processing(&self) -> bool {
self.processing > 0
}
pub(crate) fn mark_processed(&mut self) {
if self.processing == 0 {
warn!("processing count should never go below zero");
}
debug_assert!(
self.processing > 0,
"processing count should never go below zero"
);
self.processing = self.processing.saturating_sub(1);
}
fn increment_processing_count(&mut self) {
self.processing += 1;
if self.processing > self.limits.max_key_concurrency {
warn!("Processing count should not exceed max key concurrency");
}
debug_assert!(
self.processing <= self.limits.max_key_concurrency,
"Processing count should not exceed max key concurrency"
);
}
pub(crate) fn mark_resource_acquisition_finished(&mut self) {
if self.pre_acquiring == 0 {
warn!("pre-acquiring count should never go below zero");
}
debug_assert!(
self.pre_acquiring > 0,
"pre-acquiring count should never go below zero"
);
self.pre_acquiring = self.pre_acquiring.saturating_sub(1);
}
fn increment_resource_acquisition_count(&mut self) {
self.pre_acquiring += 1;
if self.pre_acquiring > self.limits.max_key_concurrency {
warn!("pre-acquiring count should not exceed max key concurrency");
}
debug_assert!(
self.pre_acquiring <= self.limits.max_key_concurrency,
"pre-acquiring count should not exceed max key concurrency"
);
}
pub(crate) fn at_max_total_processing_capacity(&self) -> bool {
self.pre_acquiring + self.processing >= self.limits.max_key_concurrency
}
pub(crate) fn push(&mut self, item: BatchItem<P>) {
let back = self.queue.back_mut().expect("Should always be non-empty");
if back.is_full(self.limits.max_batch_size) {
let mut new_back = back.new_generation();
new_back.push(item);
self.queue.push_back(new_back);
} else {
back.push(item);
}
}
pub(crate) fn has_batch_ready(&self) -> bool {
for batch in &self.queue {
if batch.is_ready() {
return true;
}
}
false
}
pub(crate) fn is_generation_ready(&self, generation: Generation) -> bool {
for batch in &self.queue {
if batch.is_generation(generation) {
return batch.is_ready();
}
}
false
}
fn take_next_batch(&mut self) -> Option<Batch<P>> {
let batch = self.queue.pop_front().expect("Should always be non-empty");
if self.queue.is_empty() {
self.queue.push_back(batch.new_generation())
}
Some(batch)
}
pub(crate) fn take_next_ready_batch(&mut self) -> Option<Batch<P>> {
for (index, batch) in self.queue.iter().enumerate() {
if batch.is_ready() {
let batch = self
.queue
.remove(index)
.expect("Should exist, we just found it");
if self.queue.is_empty() {
self.queue.push_back(batch.new_generation())
}
return Some(batch);
}
}
None
}
pub(crate) fn process_next_ready_batch(
&mut self,
processor: P,
on_finished: mpsc::Sender<Message<P::Key, P::Error>>,
) {
let Some(batch) = self.take_next_ready_batch() else {
warn!(
"No ready batch found in batch queue '{}'",
self.batcher_name
);
debug_assert!(
false,
"No ready batch found in batch queue '{}'",
self.batcher_name
);
return;
};
self.increment_processing_count();
batch.process(processor, on_finished);
}
pub(crate) fn process_next_batch(
&mut self,
processor: P,
on_finished: mpsc::Sender<Message<P::Key, P::Error>>,
) {
let Some(batch) = self.take_next_batch() else {
warn!("No next batch found in batch queue '{}'", self.batcher_name);
debug_assert!(
false,
"No next batch found in batch queue '{}'",
self.batcher_name
);
return;
};
self.increment_processing_count();
batch.process(processor, on_finished);
}
fn take_generation(&mut self, generation: Generation) -> Option<Batch<P>> {
for (index, batch) in self.queue.iter().enumerate() {
if batch.is_generation(generation) {
let batch = self
.queue
.remove(index)
.expect("Should exist, we just found it");
if self.queue.is_empty() {
self.queue.push_back(batch.new_generation())
}
return Some(batch);
}
}
None
}
pub(crate) fn process_generation(
&mut self,
generation: Generation,
processor: P,
tx: mpsc::Sender<Message<P::Key, P::Error>>,
) {
let Some(batch) = self.take_generation(generation) else {
warn!(
"No batch found for generation {:?} in batch queue '{}'",
generation, self.batcher_name
);
debug_assert!(
false,
"No batch found for generation {:?} in batch queue '{}'",
generation, self.batcher_name
);
return;
};
self.increment_processing_count();
batch.process(processor, tx);
}
pub(crate) fn fail_generation(
&mut self,
generation: Generation,
error: BatchError<P::Error>,
tx: mpsc::Sender<Message<P::Key, P::Error>>,
) {
let Some(batch) = self.take_generation(generation) else {
warn!(
"No batch found for generation {:?} in batch queue '{}'",
generation, self.batcher_name
);
debug_assert!(
false,
"No batch found for generation {:?} in batch queue '{}'",
generation, self.batcher_name
);
return;
};
batch.fail(error, tx);
}
pub(crate) fn pre_acquire_resources(
&mut self,
processor: P,
tx: mpsc::Sender<Message<P::Key, P::Error>>,
) {
self.increment_resource_acquisition_count();
let Some(batch) = self
.queue
.iter_mut()
.find(|batch| !batch.has_started_acquiring())
else {
self.pre_acquiring -= 1;
warn!(
"No batch found needing resource acquisition in batch queue '{}'",
self.batcher_name
);
debug_assert!(
false,
"No batch found needing resource acquisition in batch queue '{}'",
self.batcher_name
);
return;
};
batch.pre_acquire_resources(processor, tx);
}
pub(crate) fn process_after(
&mut self,
duration: Duration,
tx: mpsc::Sender<Message<P::Key, P::Error>>,
) {
let back = self.queue.back_mut().expect("Should always be non-empty");
back.process_after(duration, tx);
}
}
impl<P: Processor> Debug for BatchQueue<P> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
batcher_name,
queue,
limits,
processing,
pre_acquiring,
} = self;
f.debug_struct("BatchQueue")
.field("batcher_name", &batcher_name)
.field("queue", &queue)
.field("processing", &processing)
.field("pre_acquiring", &pre_acquiring)
.field("limits", limits)
.finish()
}
}