Skip to main content

batch_aint_one/policies/
mod.rs

1//! Batching policies that control when batches get processed.
2
3use std::{
4    fmt::{self, Debug, Display},
5    time::Duration,
6};
7
8use crate::{
9    Limits, Processor,
10    batch_inner::Generation,
11    batch_queue::BatchQueue,
12    error::{ConcurrencyStatus, RejectionReason},
13};
14
15mod balanced;
16mod duration;
17mod immediate;
18mod size;
19
20#[cfg(test)]
21mod test_utils;
22
23/// A policy controlling when batches get processed.
24///
25/// Each policy is optimised for a different use case:
26///
27/// - [`Immediate`](Self::Immediate): Prioritises low latency
28/// - [`Size`](Self::Size): Prioritises high batch utilisation
29/// - [`Duration`](Self::Duration): Prioritises regularity
30/// - [`Balanced`](Self::Balanced): Balances resource efficiency and latency
31#[derive(Debug, Clone)]
32#[non_exhaustive]
33pub enum BatchingPolicy {
34    /// Immediately process the batch if possible.
35    ///
36    /// When concurrency and resources are available, new items will be processed immediately (with
37    /// a batch size of one).
38    ///
39    /// When resources are not immediately available, then the batch will remain open while
40    /// acquiring resources  to allow more items to be added, up to the maximum batch size.
41    ///
42    /// In this way, we try to prioritise larger batch sizes, while still keeping latency low.
43    ///
44    /// When concurrency is maximised, new items will added to the next batch (up to the maximum
45    /// batch size). As soon as a batch finishes the next batch will start. When concurrency is
46    /// limited to 1, it will run batches serially.
47    ///
48    /// Prioritises low latency.
49    Immediate,
50
51    /// Process the batch when it reaches the maximum size.
52    ///
53    /// Prioritises high batch utilisation.
54    Size,
55
56    /// Process the batch a given duration after it was created.
57    ///
58    /// If using `OnFull::Process`, then process the batch when either the duration elapses or the
59    /// batch becomes full, whichever happens first.
60    ///
61    /// Prioritises regularity.
62    Duration(Duration, OnFull),
63
64    /// Balance between resource efficiency and latency based on system load.
65    ///
66    /// When no batches are processing, the first item processes immediately.
67    ///
68    /// When batches are already processing:
69    /// - If batch size < `min_size_hint`: Wait for either the batch to reach `min_size_hint` or
70    ///   any batch to complete
71    /// - If batch size >= `min_size_hint`: Start acquiring resources and process immediately
72    ///
73    /// The `min_size_hint` must be <= `max_batch_size`.
74    ///
75    /// Prioritises efficient resource usage while maintaining reasonable latency.
76    Balanced {
77        /// The minimum batch size to prefer before using additional concurrency.
78        min_size_hint: usize,
79    },
80}
81
82/// What to do when a batch becomes full.
83#[derive(Debug, Clone, Copy)]
84#[non_exhaustive]
85pub enum OnFull {
86    /// Immediately attempt process the batch. If the maximum concurrency has been reached for the
87    /// key, it will reject.
88    Process,
89
90    /// Reject any additional items. The batch will be processed when another condition is reached.
91    Reject,
92}
93
94/// Action to take when adding an item to a batch.
95#[derive(Debug)]
96pub(crate) enum OnAdd {
97    AddAndProcess,
98    AddAndAcquireResources,
99    AddAndProcessAfter(Duration),
100    Reject(RejectionReason),
101    Add,
102}
103
104/// Action to take when a specific generation times out or acquires resources.
105#[derive(Debug)]
106pub(crate) enum OnGenerationEvent {
107    Process,
108    DoNothing,
109}
110
111/// Action to take when a batch finishes processing.
112#[derive(Debug)]
113pub(crate) enum OnFinish {
114    ProcessNext,
115    ProcessNextReady,
116    DoNothing,
117}
118
119impl Display for BatchingPolicy {
120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121        match self {
122            BatchingPolicy::Immediate => write!(f, "Immediate"),
123            BatchingPolicy::Size => write!(f, "Size"),
124            BatchingPolicy::Duration(duration, on_full) => {
125                write!(f, "Duration({}ms, {:?})", duration.as_millis(), on_full)
126            }
127            BatchingPolicy::Balanced { min_size_hint } => {
128                write!(f, "Balanced(min_size: {})", min_size_hint)
129            }
130        }
131    }
132}
133
134impl BatchingPolicy {
135    /// Normalise the policy to ensure it's valid for the given limits.
136    pub(crate) fn normalise(self, limits: Limits) -> Self {
137        match self {
138            BatchingPolicy::Balanced { min_size_hint } => BatchingPolicy::Balanced {
139                min_size_hint: min_size_hint.min(limits.max_batch_size),
140            },
141            other => other,
142        }
143    }
144
145    /// Should be applied _before_ adding the new item to the batch.
146    pub(crate) fn on_add<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> OnAdd {
147        if let Some(rejection) = self.should_reject(batch_queue) {
148            return OnAdd::Reject(rejection);
149        }
150
151        match self {
152            Self::Immediate => immediate::on_add(batch_queue),
153            Self::Size => size::on_add(batch_queue),
154            Self::Duration(dur, on_full) => duration::on_add(*dur, *on_full, batch_queue),
155            Self::Balanced { min_size_hint } => balanced::on_add(*min_size_hint, batch_queue),
156        }
157    }
158
159    /// Check if the item should be rejected due to capacity constraints.
160    fn should_reject<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> Option<RejectionReason> {
161        if batch_queue.is_full() {
162            if batch_queue.at_max_total_processing_capacity() {
163                Some(RejectionReason::BatchQueueFull(ConcurrencyStatus::MaxedOut))
164            } else {
165                Some(RejectionReason::BatchQueueFull(
166                    ConcurrencyStatus::Available,
167                ))
168            }
169        } else {
170            None
171        }
172    }
173
174    pub(crate) fn on_timeout<P: Processor>(
175        &self,
176        generation: Generation,
177        batch_queue: &BatchQueue<P>,
178    ) -> OnGenerationEvent {
179        if batch_queue.at_max_total_processing_capacity() {
180            OnGenerationEvent::DoNothing
181        } else {
182            Self::process_generation_if_ready(generation, batch_queue)
183        }
184    }
185
186    pub(crate) fn on_resources_acquired<P: Processor>(
187        &self,
188        generation: Generation,
189        batch_queue: &BatchQueue<P>,
190    ) -> OnGenerationEvent {
191        if batch_queue.at_max_total_processing_capacity() {
192            soft_assert!(
193                false,
194                "on_resources_acquired called when at max processing capacity"
195            );
196            OnGenerationEvent::DoNothing
197        } else {
198            Self::process_generation_if_ready(generation, batch_queue)
199        }
200    }
201
202    pub(crate) fn on_finish<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> OnFinish {
203        if batch_queue.at_max_total_processing_capacity() {
204            soft_assert!(false, "on_finish called when at max processing capacity");
205            return OnFinish::DoNothing;
206        }
207        match self {
208            BatchingPolicy::Immediate => immediate::on_finish(batch_queue),
209            BatchingPolicy::Size => size::on_finish(batch_queue),
210            BatchingPolicy::Duration(_, _) => duration::on_finish(batch_queue),
211            BatchingPolicy::Balanced { .. } => balanced::on_finish(batch_queue),
212        }
213    }
214
215    fn process_generation_if_ready<P: Processor>(
216        generation: Generation,
217        batch_queue: &BatchQueue<P>,
218    ) -> OnGenerationEvent {
219        if batch_queue.is_generation_ready(generation) {
220            OnGenerationEvent::Process
221        } else {
222            OnGenerationEvent::DoNothing
223        }
224    }
225}