batch_aint_one/policies.rs
1use std::{
2 fmt::{Debug, Display},
3 time::Duration,
4};
5
6use crate::{batch_queue::BatchQueue, error::RejectionReason};
7
8/// A policy controlling when batches get processed.
9#[derive(Debug)]
10#[non_exhaustive]
11pub enum BatchingPolicy {
12 /// Immediately process the batch if possible.
13 ///
14 /// When concurrency is available, new items will be processed immediately (with a batch size of
15 /// one).
16 ///
17 /// When concurrency is maximised, new items will added to the next batch (up to the maximum
18 /// batch size). As soon as a batch finishes the next batch will start. When concurrency is
19 /// limited to 1, it will run batches serially.
20 ///
21 /// Prioritises low latency.
22 Immediate,
23
24 /// Process the batch when it reaches the maximum size.
25 ///
26 /// Prioritises high batch utilisation.
27 Size,
28
29 /// Process the batch a given duration after it was created.
30 ///
31 /// Prioritises regularity.
32 Duration(Duration, OnFull),
33}
34
35/// A policy controlling limits on batch sizes and concurrency.
36///
37/// New items will be rejected when both the limits have been reached.
38///
39/// `max_key_concurrency * max_batch_size` is both:
40///
41/// - The number of items that can be processed concurrently.
42/// - The number of items that can be queued concurrently.
43///
44/// So the total number of items in the system can be up to `2 * max_key_concurrency *
45/// max_batch_size`.
46#[derive(Debug, Clone, Copy)]
47#[non_exhaustive]
48pub struct Limits {
49 pub(crate) max_batch_size: usize,
50 pub(crate) max_key_concurrency: usize,
51}
52
53/// What to do when a batch becomes full.
54#[derive(Debug)]
55#[non_exhaustive]
56pub enum OnFull {
57 /// Immediately attempt process the batch. If the maximum concurrency has been reached for the
58 /// key, it will reject.
59 Process,
60
61 /// Reject any additional items. The batch will be processed when another condition is reached.
62 Reject,
63}
64
65pub enum PreAdd {
66 AddAndProcess,
67 AddAndProcessAfter(Duration),
68 Reject(RejectionReason),
69 Add,
70}
71
72pub enum PostFinish {
73 Process,
74 DoNothing,
75}
76
77impl Limits {
78 /// Limits the maximum size of a batch.
79 pub fn max_batch_size(self, max: usize) -> Self {
80 Self {
81 max_batch_size: max,
82 ..self
83 }
84 }
85
86 /// Limits the maximum number of batches that can be processed concurrently for a key.
87 pub fn max_key_concurrency(self, max: usize) -> Self {
88 Self {
89 max_key_concurrency: max,
90 ..self
91 }
92 }
93}
94
95impl Default for Limits {
96 fn default() -> Self {
97 Self {
98 max_batch_size: 100,
99 max_key_concurrency: 10,
100 }
101 }
102}
103
104impl BatchingPolicy {
105 /// Should be applied _before_ adding the new item to the batch.
106 pub(crate) fn pre_add<K, I, O, E: Display>(
107 &self,
108 batch_queue: &BatchQueue<K, I, O, E>,
109 ) -> PreAdd
110 where
111 K: 'static + Send + Clone,
112 {
113 // Check if we have capacity to process this item.
114 if batch_queue.is_full() {
115 if batch_queue.at_max_processing_capacity() {
116 return PreAdd::Reject(RejectionReason::MaxConcurrency);
117 } else {
118 // We might still be waiting to process the next batch.
119 return PreAdd::Reject(RejectionReason::BatchFull);
120 }
121 }
122
123 match self {
124 Self::Size if batch_queue.last_space_in_batch() => {
125 if batch_queue.at_max_processing_capacity() {
126 PreAdd::Add
127 } else {
128 PreAdd::AddAndProcess
129 }
130 }
131
132 Self::Duration(_dur, on_full) if batch_queue.last_space_in_batch() => {
133 if batch_queue.at_max_processing_capacity() {
134 PreAdd::Add
135 } else if matches!(on_full, OnFull::Process) {
136 PreAdd::AddAndProcess
137 } else {
138 PreAdd::Add
139 }
140 }
141
142 Self::Duration(dur, _on_full) if batch_queue.adding_to_new_batch() => {
143 PreAdd::AddAndProcessAfter(*dur)
144 }
145
146 Self::Immediate if !batch_queue.at_max_processing_capacity() => PreAdd::AddAndProcess,
147
148 _ => PreAdd::Add,
149 }
150 }
151
152 pub(crate) fn post_finish<K, I, O, E: Display>(
153 &self,
154 batch_queue: &BatchQueue<K, I, O, E>,
155 ) -> PostFinish {
156 if !batch_queue.at_max_processing_capacity() {
157 match self {
158 BatchingPolicy::Immediate => PostFinish::Process,
159
160 _ => {
161 if batch_queue.is_next_batch_full() {
162 PostFinish::Process
163 } else {
164 PostFinish::DoNothing
165 }
166 }
167 }
168 } else {
169 PostFinish::DoNothing
170 }
171 }
172}