batch_aint_one/
policies.rs1use std::{fmt::Debug, time::Duration};
2
3use crate::{Processor, batch_queue::BatchQueue, error::RejectionReason};
4
5#[derive(Debug)]
7#[non_exhaustive]
8pub enum BatchingPolicy {
9 Immediate,
25
26 Size,
30
31 Duration(Duration, OnFull),
35}
36
37#[derive(Debug, Clone, Copy)]
49#[non_exhaustive]
50pub struct Limits {
51 pub(crate) max_batch_size: usize,
52 pub(crate) max_key_concurrency: usize,
53}
54
55#[derive(Debug)]
57#[non_exhaustive]
58pub enum OnFull {
59 Process,
62
63 Reject,
65}
66
67#[derive(Debug)]
68pub(crate) enum PreAdd {
69 AddAndProcess,
70 AddAndAcquireResources,
71 AddAndProcessAfter(Duration),
72 Reject(RejectionReason),
73 Add,
74}
75
76pub(crate) enum PostFinish {
77 Process,
78 DoNothing,
79}
80
81impl Limits {
82 pub fn with_max_batch_size(self, max: usize) -> Self {
84 Self {
85 max_batch_size: max,
86 ..self
87 }
88 }
89
90 pub fn with_max_key_concurrency(self, max: usize) -> Self {
92 Self {
93 max_key_concurrency: max,
94 ..self
95 }
96 }
97}
98
99impl Default for Limits {
100 fn default() -> Self {
101 Self {
102 max_batch_size: 100,
103 max_key_concurrency: 10,
104 }
105 }
106}
107
108impl BatchingPolicy {
109 pub(crate) fn pre_add<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> PreAdd {
111 if let Some(rejection) = self.should_reject(batch_queue) {
112 return PreAdd::Reject(rejection);
113 }
114
115 self.determine_action(batch_queue)
116 }
117
118 fn should_reject<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> Option<RejectionReason> {
120 if batch_queue.is_full() {
121 if batch_queue.at_max_processing_capacity() {
122 Some(RejectionReason::MaxConcurrency)
123 } else {
124 Some(RejectionReason::BatchFull)
125 }
126 } else {
127 None
128 }
129 }
130
131 fn determine_action<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> PreAdd {
133 match self {
134 Self::Size if batch_queue.last_space_in_batch() => self.add_or_process(batch_queue),
135
136 Self::Duration(_dur, on_full) if batch_queue.last_space_in_batch() => {
137 if matches!(on_full, OnFull::Process) {
138 self.add_or_process(batch_queue)
139 } else {
140 PreAdd::Add
141 }
142 }
143
144 Self::Duration(dur, _on_full) if batch_queue.adding_to_new_batch() => {
145 PreAdd::AddAndProcessAfter(*dur)
146 }
147
148 Self::Immediate if !batch_queue.at_max_processing_capacity() => {
149 PreAdd::AddAndAcquireResources
155 }
156
157 _ => PreAdd::Add,
158 }
159 }
160
161 fn add_or_process<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> PreAdd {
163 if batch_queue.at_max_processing_capacity() {
164 PreAdd::Add
166 } else {
167 PreAdd::AddAndProcess
168 }
169 }
170
171 pub(crate) fn post_finish<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> PostFinish {
172 if !batch_queue.at_max_processing_capacity() {
173 match self {
174 BatchingPolicy::Immediate => PostFinish::Process,
175
176 _ => {
177 if batch_queue.is_next_batch_full() {
178 PostFinish::Process
179 } else {
180 PostFinish::DoNothing
181 }
182 }
183 }
184 } else {
185 PostFinish::DoNothing
186 }
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use tracing::Span;
193
194 use crate::{Processor, batch::BatchItem, batch_queue::BatchQueue};
195
196 use super::*;
197
198 #[derive(Clone)]
199 struct TestProcessor;
200
201 impl Processor for TestProcessor {
202 type Key = String;
203 type Input = String;
204 type Output = String;
205 type Error = String;
206 type Resources = ();
207
208 async fn acquire_resources(&self, _key: String) -> Result<(), String> {
209 Ok(())
210 }
211
212 async fn process(
213 &self,
214 _key: String,
215 inputs: impl Iterator<Item = String> + Send,
216 _resources: (),
217 ) -> Result<Vec<String>, String> {
218 Ok(inputs.collect())
219 }
220 }
221
222 fn new_item() -> BatchItem<TestProcessor> {
223 let (tx, _rx) = tokio::sync::oneshot::channel();
224 BatchItem {
225 key: "key".to_string(),
226 input: "item1".to_string(),
227 tx,
228 requesting_span: Span::none(),
229 }
230 }
231
232 #[test]
233 fn limits_builder_methods() {
234 let limits = Limits::default()
235 .with_max_batch_size(50)
236 .with_max_key_concurrency(5);
237
238 assert_eq!(limits.max_batch_size, 50);
239 assert_eq!(limits.max_key_concurrency, 5);
240 }
241
242 #[test]
243 fn size_policy_waits_for_full_batch_when_empty() {
244 let limits = Limits::default()
245 .with_max_batch_size(3)
246 .with_max_key_concurrency(2);
247 let queue = BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
248
249 let policy = BatchingPolicy::Size;
250 let result = policy.pre_add(&queue);
251
252 assert!(matches!(result, PreAdd::Add));
253 }
254
255 #[test]
256 fn immediate_policy_acquires_resources_when_empty() {
257 let limits = Limits::default()
258 .with_max_batch_size(3)
259 .with_max_key_concurrency(2);
260 let queue = BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
261
262 let policy = BatchingPolicy::Immediate;
263 let result = policy.pre_add(&queue);
264
265 assert!(matches!(result, PreAdd::AddAndAcquireResources));
266 }
267
268 #[test]
269 fn duration_policy_schedules_timeout_when_empty() {
270 let limits = Limits::default().with_max_batch_size(2);
271 let queue = BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
272
273 let duration = Duration::from_millis(100);
274 let policy = BatchingPolicy::Duration(duration, OnFull::Process);
275 let result = policy.pre_add(&queue);
276
277 assert!(matches!(result, PreAdd::AddAndProcessAfter(d) if d == duration));
278 }
279
280 #[test]
281 fn size_policy_processes_when_batch_becomes_full() {
282 let limits = Limits::default().with_max_batch_size(2);
283 let mut queue =
284 BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
285
286 queue.push(new_item());
288
289 let policy = BatchingPolicy::Size;
290 let result = policy.pre_add(&queue);
291
292 assert!(matches!(result, PreAdd::AddAndProcess));
294 }
295
296 #[tokio::test]
297 async fn immediate_policy_adds_when_at_max_capacity() {
298 let limits = Limits::default()
299 .with_max_batch_size(1)
300 .with_max_key_concurrency(1);
301 let mut queue =
302 BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
303
304 queue.push(new_item());
305
306 let batch = queue.take_next_batch().unwrap();
307
308 let (on_finished, _rx) = tokio::sync::mpsc::channel(1);
309 batch.process(TestProcessor, on_finished);
310
311 let policy = BatchingPolicy::Immediate;
312 let result = policy.pre_add(&queue);
313
314 assert!(matches!(result, PreAdd::Add));
316 }
317
318 #[tokio::test]
319 async fn size_policy_rejects_when_full_and_at_capacity() {
320 let limits = Limits::default()
321 .with_max_batch_size(1)
322 .with_max_key_concurrency(1);
323 let mut queue =
324 BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
325
326 queue.push(new_item());
328
329 let batch = queue.take_next_batch().unwrap();
331 let (on_finished, _rx) = tokio::sync::mpsc::channel(1);
332 batch.process(TestProcessor, on_finished);
333
334 queue.push(new_item());
336
337 let policy = BatchingPolicy::Size;
339 let result = policy.pre_add(&queue);
340
341 assert!(matches!(
342 result,
343 PreAdd::Reject(RejectionReason::MaxConcurrency)
344 ));
345 }
346
347 #[test]
348 fn duration_policy_onfull_reject_rejects_when_full_but_not_processing() {
349 let limits = Limits::default()
350 .with_max_batch_size(1)
351 .with_max_key_concurrency(1);
352 let mut queue =
353 BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
354
355 queue.push(new_item());
357
358 let policy = BatchingPolicy::Duration(Duration::from_millis(100), OnFull::Reject);
360 let result = policy.pre_add(&queue);
361
362 assert!(matches!(result, PreAdd::Reject(RejectionReason::BatchFull)));
363 }
364}