1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
use std::{
fmt::{Debug, Display},
time::Duration,
};
use crate::{batch::Batch, error::RejectionReason};
/// A policy controlling when batches get processed.
#[derive(Debug)]
#[non_exhaustive]
pub enum BatchingPolicy {
/// Immediately process the batch if possible.
///
/// Will process as many batches concurrently as the limit allows. When concurrency is
/// maximised, as soon as a batch finishes the next batch will start. When concurrency is
/// limited to 1, it will run batches serially.
Immediate,
/// Process the batch when it reaches the maximum size.
Size,
/// Process the batch a given duration after it was created.
Duration(Duration, OnFull),
}
/// A policy controlling limits on batch sizes and concurrency.
///
/// New items will be rejected when both the limits have been reached.
#[derive(Debug)]
#[non_exhaustive]
pub struct Limits {
pub(crate) max_batch_size: usize,
pub(crate) max_key_concurrency: usize,
}
/// What to do when a batch becomes full.
#[derive(Debug)]
#[non_exhaustive]
pub enum OnFull {
/// Immediately attempt process the batch. If the maximum concurrency has been reached for the
/// key, it will reject.
Process,
/// Reject any additional items. The batch will be processed when another condition is reached.
Reject,
}
pub enum PreAdd {
AddAndProcess,
AddAndProcessAfter(Duration),
Reject(RejectionReason),
Add,
}
pub enum PostFinish {
Process,
DoNothing,
}
impl Limits {
/// Limits the maximum size of a batch.
pub fn max_batch_size(self, max: usize) -> Self {
Self {
max_batch_size: max,
..self
}
}
/// Limits the maximum number of batches that can be processed concurrently for a key.
pub fn max_key_concurrency(self, max: usize) -> Self {
Self {
max_key_concurrency: max,
..self
}
}
}
impl Default for Limits {
fn default() -> Self {
Self {
max_batch_size: 100,
max_key_concurrency: 10,
}
}
}
impl BatchingPolicy {
/// Should be applied _before_ adding the new item to the batch.
pub(crate) fn pre_add<K, I, O, E: Display>(
&self,
limits: &Limits,
batch: &Batch<K, I, O, E>,
) -> PreAdd
where
K: 'static + Send + Clone,
{
if batch.is_full(limits.max_batch_size) {
if batch.processing() >= limits.max_key_concurrency {
return PreAdd::Reject(RejectionReason::MaxConcurrency);
} else {
return PreAdd::Reject(RejectionReason::BatchFull);
}
}
match self {
Self::Size if batch.has_single_space(limits.max_batch_size) => {
if batch.processing() >= limits.max_key_concurrency {
PreAdd::Add
} else {
PreAdd::AddAndProcess
}
}
Self::Duration(_dur, on_full) if batch.has_single_space(limits.max_batch_size) => {
if batch.processing() >= limits.max_key_concurrency {
PreAdd::Add
} else if matches!(on_full, OnFull::Process) {
PreAdd::AddAndProcess
} else {
PreAdd::Add
}
}
Self::Duration(dur, _on_full) if batch.is_new_batch() => {
PreAdd::AddAndProcessAfter(*dur)
}
Self::Immediate if batch.processing() < limits.max_key_concurrency => {
PreAdd::AddAndProcess
}
_ => PreAdd::Add,
}
}
pub(crate) fn post_finish<K, I, O, E: Display>(
&self,
limits: &Limits,
next_batch: &Batch<K, I, O, E>,
) -> PostFinish {
if next_batch.processing() < limits.max_key_concurrency {
match self {
BatchingPolicy::Immediate => PostFinish::Process,
_ => {
if next_batch.is_full(limits.max_batch_size) {
PostFinish::Process
} else {
PostFinish::DoNothing
}
}
}
} else {
PostFinish::DoNothing
}
}
}