batch_aint_one/
policies.rs

1use std::{fmt::Debug, time::Duration};
2
3use crate::{Processor, batch_queue::BatchQueue, error::RejectionReason};
4
5/// A policy controlling when batches get processed.
6#[derive(Debug)]
7#[non_exhaustive]
8pub enum BatchingPolicy {
9    /// Immediately process the batch if possible.
10    ///
11    /// When concurrency and resources are available, new items will be processed immediately (with
12    /// a batch size of one).
13    ///
14    /// When resources are not immediately available, then the batch will remain open while
15    /// acquiring resources  to allow more items to be added, up to the maximum batch size.
16    ///
17    /// In this way, we try to prioritise larger batch sizes, while still keeping latency low.
18    ///
19    /// When concurrency is maximised, new items will added to the next batch (up to the maximum
20    /// batch size). As soon as a batch finishes the next batch will start. When concurrency is
21    /// limited to 1, it will run batches serially.
22    ///
23    /// Prioritises low latency.
24    Immediate,
25
26    /// Process the batch when it reaches the maximum size.
27    ///
28    /// Prioritises high batch utilisation.
29    Size,
30
31    /// Process the batch a given duration after it was created.
32    ///
33    /// If using `OnFull::Process`, then process the batch when either the duration elapses or the
34    /// batch becomes full, whichever happens first.
35    ///
36    /// Prioritises regularity.
37    Duration(Duration, OnFull),
38}
39
40/// A policy controlling limits on batch sizes and concurrency.
41///
42/// New items will be rejected when both the limits have been reached.
43///
44/// `max_key_concurrency * max_batch_size` is both:
45///
46/// - The number of items that can be processed concurrently.
47/// - The number of items that can be queued concurrently.
48///
49/// So the total number of items in the system can be up to `2 * max_key_concurrency *
50/// max_batch_size`.
51#[derive(Debug, Clone, Copy)]
52#[non_exhaustive]
53pub struct Limits {
54    pub(crate) max_batch_size: usize,
55    pub(crate) max_key_concurrency: usize,
56}
57
58/// What to do when a batch becomes full.
59#[derive(Debug)]
60#[non_exhaustive]
61pub enum OnFull {
62    /// Immediately attempt process the batch. If the maximum concurrency has been reached for the
63    /// key, it will reject.
64    Process,
65
66    /// Reject any additional items. The batch will be processed when another condition is reached.
67    Reject,
68}
69
70#[derive(Debug)]
71pub(crate) enum PreAdd {
72    AddAndProcess,
73    AddAndAcquireResources,
74    AddAndProcessAfter(Duration),
75    Reject(RejectionReason),
76    Add,
77}
78
79pub(crate) enum PostFinish {
80    Process,
81    DoNothing,
82}
83
84impl Limits {
85    /// Limits the maximum size of a batch.
86    pub fn with_max_batch_size(self, max: usize) -> Self {
87        Self {
88            max_batch_size: max,
89            ..self
90        }
91    }
92
93    /// Limits the maximum number of batches that can be processed concurrently for a key.
94    pub fn with_max_key_concurrency(self, max: usize) -> Self {
95        Self {
96            max_key_concurrency: max,
97            ..self
98        }
99    }
100}
101
102impl Default for Limits {
103    fn default() -> Self {
104        Self {
105            max_batch_size: 100,
106            max_key_concurrency: 10,
107        }
108    }
109}
110
111impl BatchingPolicy {
112    /// Should be applied _before_ adding the new item to the batch.
113    pub(crate) fn pre_add<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> PreAdd {
114        if let Some(rejection) = self.should_reject(batch_queue) {
115            return PreAdd::Reject(rejection);
116        }
117
118        self.determine_action(batch_queue)
119    }
120
121    /// Check if the item should be rejected due to capacity constraints.
122    fn should_reject<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> Option<RejectionReason> {
123        if batch_queue.is_full() {
124            if batch_queue.at_max_processing_capacity() {
125                Some(RejectionReason::MaxConcurrency)
126            } else {
127                Some(RejectionReason::BatchFull)
128            }
129        } else {
130            None
131        }
132    }
133
134    /// Determine the appropriate action based on policy and batch state.
135    fn determine_action<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> PreAdd {
136        match self {
137            Self::Size if batch_queue.last_space_in_batch() => self.add_or_process(batch_queue),
138
139            Self::Duration(_dur, on_full) if batch_queue.last_space_in_batch() => {
140                if matches!(on_full, OnFull::Process) {
141                    self.add_or_process(batch_queue)
142                } else {
143                    PreAdd::Add
144                }
145            }
146
147            Self::Duration(dur, _on_full) if batch_queue.adding_to_new_batch() => {
148                PreAdd::AddAndProcessAfter(*dur)
149            }
150
151            Self::Immediate => {
152                if batch_queue.at_max_processing_capacity() {
153                    PreAdd::Add
154                } else {
155                    // Start acquiring resources immediately for a new batch
156                    if batch_queue.adding_to_new_batch() && !batch_queue.at_max_acquiring_capacity()
157                    {
158                        PreAdd::AddAndAcquireResources
159                    } else {
160                        PreAdd::Add
161                    }
162                }
163            }
164
165            _ => PreAdd::Add,
166        }
167    }
168
169    /// Decide between Add and AddAndProcess based on processing capacity.
170    fn add_or_process<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> PreAdd {
171        if batch_queue.at_max_processing_capacity() {
172            // We can't process the batch yet, so just add to it.
173            PreAdd::Add
174        } else {
175            PreAdd::AddAndProcess
176        }
177    }
178
179    pub(crate) fn post_finish<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> PostFinish {
180        if !batch_queue.at_max_processing_capacity() {
181            match self {
182                BatchingPolicy::Immediate => PostFinish::Process,
183
184                BatchingPolicy::Duration(_, _) if batch_queue.has_next_batch_timeout_expired() => {
185                    PostFinish::Process
186                }
187
188                _ => {
189                    if batch_queue.is_next_batch_full() {
190                        PostFinish::Process
191                    } else {
192                        PostFinish::DoNothing
193                    }
194                }
195            }
196        } else {
197            PostFinish::DoNothing
198        }
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use tracing::Span;
205
206    use crate::{Processor, batch::BatchItem, batch_queue::BatchQueue};
207
208    use super::*;
209
210    #[derive(Clone)]
211    struct TestProcessor;
212
213    impl Processor for TestProcessor {
214        type Key = String;
215        type Input = String;
216        type Output = String;
217        type Error = String;
218        type Resources = ();
219
220        async fn acquire_resources(&self, _key: String) -> Result<(), String> {
221            Ok(())
222        }
223
224        async fn process(
225            &self,
226            _key: String,
227            inputs: impl Iterator<Item = String> + Send,
228            _resources: (),
229        ) -> Result<Vec<String>, String> {
230            Ok(inputs.collect())
231        }
232    }
233
234    fn new_item() -> BatchItem<TestProcessor> {
235        let (tx, _rx) = tokio::sync::oneshot::channel();
236        BatchItem {
237            key: "key".to_string(),
238            input: "item1".to_string(),
239            tx,
240            requesting_span: Span::none(),
241        }
242    }
243
244    #[test]
245    fn limits_builder_methods() {
246        let limits = Limits::default()
247            .with_max_batch_size(50)
248            .with_max_key_concurrency(5);
249
250        assert_eq!(limits.max_batch_size, 50);
251        assert_eq!(limits.max_key_concurrency, 5);
252    }
253
254    #[test]
255    fn size_policy_waits_for_full_batch_when_empty() {
256        let limits = Limits::default()
257            .with_max_batch_size(3)
258            .with_max_key_concurrency(2);
259        let queue = BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
260
261        let policy = BatchingPolicy::Size;
262        let result = policy.pre_add(&queue);
263
264        assert!(matches!(result, PreAdd::Add));
265    }
266
267    #[test]
268    fn immediate_policy_acquires_resources_when_empty() {
269        let limits = Limits::default()
270            .with_max_batch_size(3)
271            .with_max_key_concurrency(2);
272        let queue = BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
273
274        let policy = BatchingPolicy::Immediate;
275        let result = policy.pre_add(&queue);
276
277        assert!(matches!(result, PreAdd::AddAndAcquireResources));
278    }
279
280    #[test]
281    fn duration_policy_schedules_timeout_when_empty() {
282        let limits = Limits::default().with_max_batch_size(2);
283        let queue = BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
284
285        let duration = Duration::from_millis(100);
286        let policy = BatchingPolicy::Duration(duration, OnFull::Process);
287        let result = policy.pre_add(&queue);
288
289        assert!(matches!(result, PreAdd::AddAndProcessAfter(d) if d == duration));
290    }
291
292    #[test]
293    fn size_policy_processes_when_batch_becomes_full() {
294        let limits = Limits::default().with_max_batch_size(2);
295        let mut queue =
296            BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
297
298        // Add one item to make it nearly full
299        queue.push(new_item());
300
301        let policy = BatchingPolicy::Size;
302        let result = policy.pre_add(&queue);
303
304        // Should process when adding the last item
305        assert!(matches!(result, PreAdd::AddAndProcess));
306    }
307
308    #[tokio::test]
309    async fn immediate_policy_adds_when_at_max_capacity() {
310        let limits = Limits::default()
311            .with_max_batch_size(1)
312            .with_max_key_concurrency(1);
313        let mut queue =
314            BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
315
316        queue.push(new_item());
317
318        let batch = queue.take_next_processable_batch().unwrap();
319
320        let (on_finished, _rx) = tokio::sync::mpsc::channel(1);
321        batch.process(TestProcessor, on_finished);
322
323        let policy = BatchingPolicy::Immediate;
324
325        let result = policy.pre_add(&queue);
326        assert!(matches!(result, PreAdd::Add));
327    }
328
329    #[tokio::test]
330    async fn size_policy_rejects_when_full_and_at_capacity() {
331        let limits = Limits::default()
332            .with_max_batch_size(1)
333            .with_max_key_concurrency(1);
334        let mut queue =
335            BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
336
337        // Fill the current batch
338        queue.push(new_item());
339
340        // Start processing to reach max processing capacity
341        let batch = queue.take_next_processable_batch().unwrap();
342        let (on_finished, _rx) = tokio::sync::mpsc::channel(1);
343        batch.process(TestProcessor, on_finished);
344
345        // Fill the next batch to reach max queueing capacity
346        queue.push(new_item());
347
348        // Now we're full and at capacity - should reject
349        let policy = BatchingPolicy::Size;
350        let result = policy.pre_add(&queue);
351
352        assert!(matches!(
353            result,
354            PreAdd::Reject(RejectionReason::MaxConcurrency)
355        ));
356    }
357
358    #[test]
359    fn duration_policy_onfull_reject_rejects_when_full_but_not_processing() {
360        let limits = Limits::default()
361            .with_max_batch_size(1)
362            .with_max_key_concurrency(1);
363        let mut queue =
364            BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
365
366        // Fill the batch but don't start processing
367        queue.push(new_item());
368
369        // Full but not at processing capacity yet - should still reject as BatchFull
370        let policy = BatchingPolicy::Duration(Duration::from_millis(100), OnFull::Reject);
371        let result = policy.pre_add(&queue);
372
373        assert!(matches!(result, PreAdd::Reject(RejectionReason::BatchFull)));
374    }
375}