batch_aint_one/
policies.rs1use std::{fmt::Debug, time::Duration};
2
3use crate::{Processor, batch_queue::BatchQueue, error::RejectionReason};
4
5#[derive(Debug)]
7#[non_exhaustive]
8pub enum BatchingPolicy {
9 Immediate,
25
26 Size,
30
31 Duration(Duration, OnFull),
38}
39
40#[derive(Debug, Clone, Copy)]
52#[non_exhaustive]
53pub struct Limits {
54 pub(crate) max_batch_size: usize,
55 pub(crate) max_key_concurrency: usize,
56}
57
58#[derive(Debug)]
60#[non_exhaustive]
61pub enum OnFull {
62 Process,
65
66 Reject,
68}
69
70#[derive(Debug)]
71pub(crate) enum PreAdd {
72 AddAndProcess,
73 AddAndAcquireResources,
74 AddAndProcessAfter(Duration),
75 Reject(RejectionReason),
76 Add,
77}
78
79pub(crate) enum PostFinish {
80 Process,
81 DoNothing,
82}
83
84impl Limits {
85 pub fn with_max_batch_size(self, max: usize) -> Self {
87 Self {
88 max_batch_size: max,
89 ..self
90 }
91 }
92
93 pub fn with_max_key_concurrency(self, max: usize) -> Self {
95 Self {
96 max_key_concurrency: max,
97 ..self
98 }
99 }
100}
101
102impl Default for Limits {
103 fn default() -> Self {
104 Self {
105 max_batch_size: 100,
106 max_key_concurrency: 10,
107 }
108 }
109}
110
111impl BatchingPolicy {
112 pub(crate) fn pre_add<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> PreAdd {
114 if let Some(rejection) = self.should_reject(batch_queue) {
115 return PreAdd::Reject(rejection);
116 }
117
118 self.determine_action(batch_queue)
119 }
120
121 fn should_reject<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> Option<RejectionReason> {
123 if batch_queue.is_full() {
124 if batch_queue.at_max_processing_capacity() {
125 Some(RejectionReason::MaxConcurrency)
126 } else {
127 Some(RejectionReason::BatchFull)
128 }
129 } else {
130 None
131 }
132 }
133
134 fn determine_action<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> PreAdd {
136 match self {
137 Self::Size if batch_queue.last_space_in_batch() => self.add_or_process(batch_queue),
138
139 Self::Duration(_dur, on_full) if batch_queue.last_space_in_batch() => {
140 if matches!(on_full, OnFull::Process) {
141 self.add_or_process(batch_queue)
142 } else {
143 PreAdd::Add
144 }
145 }
146
147 Self::Duration(dur, _on_full) if batch_queue.adding_to_new_batch() => {
148 PreAdd::AddAndProcessAfter(*dur)
149 }
150
151 Self::Immediate => {
152 if batch_queue.at_max_processing_capacity() {
153 PreAdd::Add
154 } else {
155 if batch_queue.adding_to_new_batch() && !batch_queue.at_max_acquiring_capacity()
157 {
158 PreAdd::AddAndAcquireResources
159 } else {
160 PreAdd::Add
161 }
162 }
163 }
164
165 _ => PreAdd::Add,
166 }
167 }
168
169 fn add_or_process<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> PreAdd {
171 if batch_queue.at_max_processing_capacity() {
172 PreAdd::Add
174 } else {
175 PreAdd::AddAndProcess
176 }
177 }
178
179 pub(crate) fn post_finish<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> PostFinish {
180 if !batch_queue.at_max_processing_capacity() {
181 match self {
182 BatchingPolicy::Immediate => PostFinish::Process,
183
184 BatchingPolicy::Duration(_, _) if batch_queue.has_next_batch_timeout_expired() => {
185 PostFinish::Process
186 }
187
188 _ => {
189 if batch_queue.is_next_batch_full() {
190 PostFinish::Process
191 } else {
192 PostFinish::DoNothing
193 }
194 }
195 }
196 } else {
197 PostFinish::DoNothing
198 }
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use tracing::Span;
205
206 use crate::{Processor, batch::BatchItem, batch_queue::BatchQueue};
207
208 use super::*;
209
210 #[derive(Clone)]
211 struct TestProcessor;
212
213 impl Processor for TestProcessor {
214 type Key = String;
215 type Input = String;
216 type Output = String;
217 type Error = String;
218 type Resources = ();
219
220 async fn acquire_resources(&self, _key: String) -> Result<(), String> {
221 Ok(())
222 }
223
224 async fn process(
225 &self,
226 _key: String,
227 inputs: impl Iterator<Item = String> + Send,
228 _resources: (),
229 ) -> Result<Vec<String>, String> {
230 Ok(inputs.collect())
231 }
232 }
233
234 fn new_item() -> BatchItem<TestProcessor> {
235 let (tx, _rx) = tokio::sync::oneshot::channel();
236 BatchItem {
237 key: "key".to_string(),
238 input: "item1".to_string(),
239 tx,
240 requesting_span: Span::none(),
241 }
242 }
243
244 #[test]
245 fn limits_builder_methods() {
246 let limits = Limits::default()
247 .with_max_batch_size(50)
248 .with_max_key_concurrency(5);
249
250 assert_eq!(limits.max_batch_size, 50);
251 assert_eq!(limits.max_key_concurrency, 5);
252 }
253
254 #[test]
255 fn size_policy_waits_for_full_batch_when_empty() {
256 let limits = Limits::default()
257 .with_max_batch_size(3)
258 .with_max_key_concurrency(2);
259 let queue = BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
260
261 let policy = BatchingPolicy::Size;
262 let result = policy.pre_add(&queue);
263
264 assert!(matches!(result, PreAdd::Add));
265 }
266
267 #[test]
268 fn immediate_policy_acquires_resources_when_empty() {
269 let limits = Limits::default()
270 .with_max_batch_size(3)
271 .with_max_key_concurrency(2);
272 let queue = BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
273
274 let policy = BatchingPolicy::Immediate;
275 let result = policy.pre_add(&queue);
276
277 assert!(matches!(result, PreAdd::AddAndAcquireResources));
278 }
279
280 #[test]
281 fn duration_policy_schedules_timeout_when_empty() {
282 let limits = Limits::default().with_max_batch_size(2);
283 let queue = BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
284
285 let duration = Duration::from_millis(100);
286 let policy = BatchingPolicy::Duration(duration, OnFull::Process);
287 let result = policy.pre_add(&queue);
288
289 assert!(matches!(result, PreAdd::AddAndProcessAfter(d) if d == duration));
290 }
291
292 #[test]
293 fn size_policy_processes_when_batch_becomes_full() {
294 let limits = Limits::default().with_max_batch_size(2);
295 let mut queue =
296 BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
297
298 queue.push(new_item());
300
301 let policy = BatchingPolicy::Size;
302 let result = policy.pre_add(&queue);
303
304 assert!(matches!(result, PreAdd::AddAndProcess));
306 }
307
308 #[tokio::test]
309 async fn immediate_policy_adds_when_at_max_capacity() {
310 let limits = Limits::default()
311 .with_max_batch_size(1)
312 .with_max_key_concurrency(1);
313 let mut queue =
314 BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
315
316 queue.push(new_item());
317
318 let batch = queue.take_next_processable_batch().unwrap();
319
320 let (on_finished, _rx) = tokio::sync::mpsc::channel(1);
321 batch.process(TestProcessor, on_finished);
322
323 let policy = BatchingPolicy::Immediate;
324
325 let result = policy.pre_add(&queue);
326 assert!(matches!(result, PreAdd::Add));
327 }
328
329 #[tokio::test]
330 async fn size_policy_rejects_when_full_and_at_capacity() {
331 let limits = Limits::default()
332 .with_max_batch_size(1)
333 .with_max_key_concurrency(1);
334 let mut queue =
335 BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
336
337 queue.push(new_item());
339
340 let batch = queue.take_next_processable_batch().unwrap();
342 let (on_finished, _rx) = tokio::sync::mpsc::channel(1);
343 batch.process(TestProcessor, on_finished);
344
345 queue.push(new_item());
347
348 let policy = BatchingPolicy::Size;
350 let result = policy.pre_add(&queue);
351
352 assert!(matches!(
353 result,
354 PreAdd::Reject(RejectionReason::MaxConcurrency)
355 ));
356 }
357
358 #[test]
359 fn duration_policy_onfull_reject_rejects_when_full_but_not_processing() {
360 let limits = Limits::default()
361 .with_max_batch_size(1)
362 .with_max_key_concurrency(1);
363 let mut queue =
364 BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
365
366 queue.push(new_item());
368
369 let policy = BatchingPolicy::Duration(Duration::from_millis(100), OnFull::Reject);
371 let result = policy.pre_add(&queue);
372
373 assert!(matches!(result, PreAdd::Reject(RejectionReason::BatchFull)));
374 }
375}