batch_aint_one/
policies.rs

1use std::{
2    fmt::{Debug, Display},
3    time::Duration,
4};
5
6use crate::{batch_queue::BatchQueue, error::RejectionReason};
7
8/// A policy controlling when batches get processed.
9#[derive(Debug)]
10#[non_exhaustive]
11pub enum BatchingPolicy {
12    /// Immediately process the batch if possible.
13    ///
14    /// When concurrency is available, new items will be processed immediately (with a batch size of
15    /// one).
16    ///
17    /// When concurrency is maximised, new items will added to the next batch (up to the maximum
18    /// batch size). As soon as a batch finishes the next batch will start. When concurrency is
19    /// limited to 1, it will run batches serially.
20    ///
21    /// Prioritises low latency.
22    Immediate,
23
24    /// Process the batch when it reaches the maximum size.
25    ///
26    /// Prioritises high batch utilisation.
27    Size,
28
29    /// Process the batch a given duration after it was created.
30    ///
31    /// Prioritises regularity.
32    Duration(Duration, OnFull),
33}
34
35/// A policy controlling limits on batch sizes and concurrency.
36///
37/// New items will be rejected when both the limits have been reached.
38///
39/// `max_key_concurrency * max_batch_size` is both:
40///
41/// - The number of items that can be processed concurrently.
42/// - The number of items that can be queued concurrently.
43///
44/// So the total number of items in the system can be up to `2 * max_key_concurrency *
45/// max_batch_size`.
46#[derive(Debug, Clone, Copy)]
47#[non_exhaustive]
48pub struct Limits {
49    pub(crate) max_batch_size: usize,
50    pub(crate) max_key_concurrency: usize,
51}
52
53/// What to do when a batch becomes full.
54#[derive(Debug)]
55#[non_exhaustive]
56pub enum OnFull {
57    /// Immediately attempt process the batch. If the maximum concurrency has been reached for the
58    /// key, it will reject.
59    Process,
60
61    /// Reject any additional items. The batch will be processed when another condition is reached.
62    Reject,
63}
64
65pub enum PreAdd {
66    AddAndProcess,
67    AddAndProcessAfter(Duration),
68    Reject(RejectionReason),
69    Add,
70}
71
72pub enum PostFinish {
73    Process,
74    DoNothing,
75}
76
77impl Limits {
78    /// Limits the maximum size of a batch.
79    pub fn max_batch_size(self, max: usize) -> Self {
80        Self {
81            max_batch_size: max,
82            ..self
83        }
84    }
85
86    /// Limits the maximum number of batches that can be processed concurrently for a key.
87    pub fn max_key_concurrency(self, max: usize) -> Self {
88        Self {
89            max_key_concurrency: max,
90            ..self
91        }
92    }
93}
94
95impl Default for Limits {
96    fn default() -> Self {
97        Self {
98            max_batch_size: 100,
99            max_key_concurrency: 10,
100        }
101    }
102}
103
104impl BatchingPolicy {
105    /// Should be applied _before_ adding the new item to the batch.
106    pub(crate) fn pre_add<K, I, O, E: Display>(
107        &self,
108        batch_queue: &BatchQueue<K, I, O, E>,
109    ) -> PreAdd
110    where
111        K: 'static + Send + Clone,
112    {
113        // Check if we have capacity to process this item.
114        if batch_queue.is_full() {
115            if batch_queue.at_max_processing_capacity() {
116                return PreAdd::Reject(RejectionReason::MaxConcurrency);
117            } else {
118                // We might still be waiting to process the next batch.
119                return PreAdd::Reject(RejectionReason::BatchFull);
120            }
121        }
122
123        match self {
124            Self::Size if batch_queue.last_space_in_batch() => {
125                if batch_queue.at_max_processing_capacity() {
126                    PreAdd::Add
127                } else {
128                    PreAdd::AddAndProcess
129                }
130            }
131
132            Self::Duration(_dur, on_full) if batch_queue.last_space_in_batch() => {
133                if batch_queue.at_max_processing_capacity() {
134                    PreAdd::Add
135                } else if matches!(on_full, OnFull::Process) {
136                    PreAdd::AddAndProcess
137                } else {
138                    PreAdd::Add
139                }
140            }
141
142            Self::Duration(dur, _on_full) if batch_queue.adding_to_new_batch() => {
143                PreAdd::AddAndProcessAfter(*dur)
144            }
145
146            Self::Immediate if !batch_queue.at_max_processing_capacity() => PreAdd::AddAndProcess,
147
148            _ => PreAdd::Add,
149        }
150    }
151
152    pub(crate) fn post_finish<K, I, O, E: Display>(
153        &self,
154        batch_queue: &BatchQueue<K, I, O, E>,
155    ) -> PostFinish {
156        if !batch_queue.at_max_processing_capacity() {
157            match self {
158                BatchingPolicy::Immediate => PostFinish::Process,
159
160                _ => {
161                    if batch_queue.is_next_batch_full() {
162                        PostFinish::Process
163                    } else {
164                        PostFinish::DoNothing
165                    }
166                }
167            }
168        } else {
169            PostFinish::DoNothing
170        }
171    }
172}