batch_aint_one/
policies.rs

1use std::{fmt::Debug, time::Duration};
2
3use crate::{Processor, batch_queue::BatchQueue, error::RejectionReason};
4
5/// A policy controlling when batches get processed.
6#[derive(Debug)]
7#[non_exhaustive]
8pub enum BatchingPolicy {
9    /// Immediately process the batch if possible.
10    ///
11    /// When concurrency and resources are available, new items will be processed immediately (with
12    /// a batch size of one).
13    ///
14    /// When resources are not immediately available, then the batch will remain open while
15    /// acquiring resources  to allow more items to be added, up to the maximum batch size.
16    ///
17    /// In this way, we try to prioritise larger batch sizes, while still keeping latency low.
18    ///
19    /// When concurrency is maximised, new items will added to the next batch (up to the maximum
20    /// batch size). As soon as a batch finishes the next batch will start. When concurrency is
21    /// limited to 1, it will run batches serially.
22    ///
23    /// Prioritises low latency.
24    Immediate,
25
26    /// Process the batch when it reaches the maximum size.
27    ///
28    /// Prioritises high batch utilisation.
29    Size,
30
31    /// Process the batch a given duration after it was created.
32    ///
33    /// Prioritises regularity.
34    Duration(Duration, OnFull),
35}
36
37/// A policy controlling limits on batch sizes and concurrency.
38///
39/// New items will be rejected when both the limits have been reached.
40///
41/// `max_key_concurrency * max_batch_size` is both:
42///
43/// - The number of items that can be processed concurrently.
44/// - The number of items that can be queued concurrently.
45///
46/// So the total number of items in the system can be up to `2 * max_key_concurrency *
47/// max_batch_size`.
48#[derive(Debug, Clone, Copy)]
49#[non_exhaustive]
50pub struct Limits {
51    pub(crate) max_batch_size: usize,
52    pub(crate) max_key_concurrency: usize,
53}
54
55/// What to do when a batch becomes full.
56#[derive(Debug)]
57#[non_exhaustive]
58pub enum OnFull {
59    /// Immediately attempt process the batch. If the maximum concurrency has been reached for the
60    /// key, it will reject.
61    Process,
62
63    /// Reject any additional items. The batch will be processed when another condition is reached.
64    Reject,
65}
66
67#[derive(Debug)]
68pub(crate) enum PreAdd {
69    AddAndProcess,
70    AddAndAcquireResources,
71    AddAndProcessAfter(Duration),
72    Reject(RejectionReason),
73    Add,
74}
75
76pub(crate) enum PostFinish {
77    Process,
78    DoNothing,
79}
80
81impl Limits {
82    /// Limits the maximum size of a batch.
83    pub fn with_max_batch_size(self, max: usize) -> Self {
84        Self {
85            max_batch_size: max,
86            ..self
87        }
88    }
89
90    /// Limits the maximum number of batches that can be processed concurrently for a key.
91    pub fn with_max_key_concurrency(self, max: usize) -> Self {
92        Self {
93            max_key_concurrency: max,
94            ..self
95        }
96    }
97}
98
99impl Default for Limits {
100    fn default() -> Self {
101        Self {
102            max_batch_size: 100,
103            max_key_concurrency: 10,
104        }
105    }
106}
107
108impl BatchingPolicy {
109    /// Should be applied _before_ adding the new item to the batch.
110    pub(crate) fn pre_add<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> PreAdd {
111        if let Some(rejection) = self.should_reject(batch_queue) {
112            return PreAdd::Reject(rejection);
113        }
114
115        self.determine_action(batch_queue)
116    }
117
118    /// Check if the item should be rejected due to capacity constraints.
119    fn should_reject<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> Option<RejectionReason> {
120        if batch_queue.is_full() {
121            if batch_queue.at_max_processing_capacity() {
122                Some(RejectionReason::MaxConcurrency)
123            } else {
124                Some(RejectionReason::BatchFull)
125            }
126        } else {
127            None
128        }
129    }
130
131    /// Determine the appropriate action based on policy and batch state.
132    fn determine_action<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> PreAdd {
133        match self {
134            Self::Size if batch_queue.last_space_in_batch() => self.add_or_process(batch_queue),
135
136            Self::Duration(_dur, on_full) if batch_queue.last_space_in_batch() => {
137                if matches!(on_full, OnFull::Process) {
138                    self.add_or_process(batch_queue)
139                } else {
140                    PreAdd::Add
141                }
142            }
143
144            Self::Duration(dur, _on_full) if batch_queue.adding_to_new_batch() => {
145                PreAdd::AddAndProcessAfter(*dur)
146            }
147
148            Self::Immediate if !batch_queue.at_max_processing_capacity() => {
149                // We want to process the batch as soon as possible, but we can't process it until
150                // we have the resources to do so. So we should acquire the resources first before
151                // starting to process.
152                //
153                // In the meantime, we should continue adding to the current batch.
154                PreAdd::AddAndAcquireResources
155            }
156
157            _ => PreAdd::Add,
158        }
159    }
160
161    /// Decide between Add and AddAndProcess based on processing capacity.
162    fn add_or_process<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> PreAdd {
163        if batch_queue.at_max_processing_capacity() {
164            // We can't process the batch yet, so just add to it.
165            PreAdd::Add
166        } else {
167            PreAdd::AddAndProcess
168        }
169    }
170
171    pub(crate) fn post_finish<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> PostFinish {
172        if !batch_queue.at_max_processing_capacity() {
173            match self {
174                BatchingPolicy::Immediate => PostFinish::Process,
175
176                _ => {
177                    if batch_queue.is_next_batch_full() {
178                        PostFinish::Process
179                    } else {
180                        PostFinish::DoNothing
181                    }
182                }
183            }
184        } else {
185            PostFinish::DoNothing
186        }
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use tracing::Span;
193
194    use crate::{Processor, batch::BatchItem, batch_queue::BatchQueue};
195
196    use super::*;
197
198    #[derive(Clone)]
199    struct TestProcessor;
200
201    impl Processor for TestProcessor {
202        type Key = String;
203        type Input = String;
204        type Output = String;
205        type Error = String;
206        type Resources = ();
207
208        async fn acquire_resources(&self, _key: String) -> Result<(), String> {
209            Ok(())
210        }
211
212        async fn process(
213            &self,
214            _key: String,
215            inputs: impl Iterator<Item = String> + Send,
216            _resources: (),
217        ) -> Result<Vec<String>, String> {
218            Ok(inputs.collect())
219        }
220    }
221
222    fn new_item() -> BatchItem<TestProcessor> {
223        let (tx, _rx) = tokio::sync::oneshot::channel();
224        BatchItem {
225            key: "key".to_string(),
226            input: "item1".to_string(),
227            tx,
228            requesting_span: Span::none(),
229        }
230    }
231
232    #[test]
233    fn limits_builder_methods() {
234        let limits = Limits::default()
235            .with_max_batch_size(50)
236            .with_max_key_concurrency(5);
237
238        assert_eq!(limits.max_batch_size, 50);
239        assert_eq!(limits.max_key_concurrency, 5);
240    }
241
242    #[test]
243    fn size_policy_waits_for_full_batch_when_empty() {
244        let limits = Limits::default()
245            .with_max_batch_size(3)
246            .with_max_key_concurrency(2);
247        let queue = BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
248
249        let policy = BatchingPolicy::Size;
250        let result = policy.pre_add(&queue);
251
252        assert!(matches!(result, PreAdd::Add));
253    }
254
255    #[test]
256    fn immediate_policy_acquires_resources_when_empty() {
257        let limits = Limits::default()
258            .with_max_batch_size(3)
259            .with_max_key_concurrency(2);
260        let queue = BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
261
262        let policy = BatchingPolicy::Immediate;
263        let result = policy.pre_add(&queue);
264
265        assert!(matches!(result, PreAdd::AddAndAcquireResources));
266    }
267
268    #[test]
269    fn duration_policy_schedules_timeout_when_empty() {
270        let limits = Limits::default().with_max_batch_size(2);
271        let queue = BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
272
273        let duration = Duration::from_millis(100);
274        let policy = BatchingPolicy::Duration(duration, OnFull::Process);
275        let result = policy.pre_add(&queue);
276
277        assert!(matches!(result, PreAdd::AddAndProcessAfter(d) if d == duration));
278    }
279
280    #[test]
281    fn size_policy_processes_when_batch_becomes_full() {
282        let limits = Limits::default().with_max_batch_size(2);
283        let mut queue =
284            BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
285
286        // Add one item to make it nearly full
287        queue.push(new_item());
288
289        let policy = BatchingPolicy::Size;
290        let result = policy.pre_add(&queue);
291
292        // Should process when adding the last item
293        assert!(matches!(result, PreAdd::AddAndProcess));
294    }
295
296    #[tokio::test]
297    async fn immediate_policy_adds_when_at_max_capacity() {
298        let limits = Limits::default()
299            .with_max_batch_size(1)
300            .with_max_key_concurrency(1);
301        let mut queue =
302            BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
303
304        queue.push(new_item());
305
306        let batch = queue.take_next_batch().unwrap();
307
308        let (on_finished, _rx) = tokio::sync::mpsc::channel(1);
309        batch.process(TestProcessor, on_finished);
310
311        let policy = BatchingPolicy::Immediate;
312        let result = policy.pre_add(&queue);
313
314        // Should just add since can't process more
315        assert!(matches!(result, PreAdd::Add));
316    }
317
318    #[tokio::test]
319    async fn size_policy_rejects_when_full_and_at_capacity() {
320        let limits = Limits::default()
321            .with_max_batch_size(1)
322            .with_max_key_concurrency(1);
323        let mut queue =
324            BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
325
326        // Fill the current batch
327        queue.push(new_item());
328
329        // Start processing to reach max processing capacity
330        let batch = queue.take_next_batch().unwrap();
331        let (on_finished, _rx) = tokio::sync::mpsc::channel(1);
332        batch.process(TestProcessor, on_finished);
333
334        // Fill the next batch to reach max queueing capacity
335        queue.push(new_item());
336
337        // Now we're full and at capacity - should reject
338        let policy = BatchingPolicy::Size;
339        let result = policy.pre_add(&queue);
340
341        assert!(matches!(
342            result,
343            PreAdd::Reject(RejectionReason::MaxConcurrency)
344        ));
345    }
346
347    #[test]
348    fn duration_policy_onfull_reject_rejects_when_full_but_not_processing() {
349        let limits = Limits::default()
350            .with_max_batch_size(1)
351            .with_max_key_concurrency(1);
352        let mut queue =
353            BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
354
355        // Fill the batch but don't start processing
356        queue.push(new_item());
357
358        // Full but not at processing capacity yet - should still reject as BatchFull
359        let policy = BatchingPolicy::Duration(Duration::from_millis(100), OnFull::Reject);
360        let result = policy.pre_add(&queue);
361
362        assert!(matches!(result, PreAdd::Reject(RejectionReason::BatchFull)));
363    }
364}