batch_aint_one/policies/
mod.rs

1//! Batching policies that control when batches get processed.
2//!
3//! This module provides different batching strategies, each optimised for different use cases:
4//!
5//! - [`Immediate`](BatchingPolicy::Immediate): Prioritises low latency
6//! - [`Size`](BatchingPolicy::Size): Prioritises high batch utilisation
7//! - [`Duration`](BatchingPolicy::Duration): Prioritises regularity
8//! - [`Balanced`](BatchingPolicy::Balanced): Balances resource efficiency and latency
9
10use std::{
11    fmt::{self, Debug, Display},
12    time::Duration,
13};
14
15use tracing::warn;
16
17use crate::{
18    Limits, Processor,
19    batch_inner::Generation,
20    batch_queue::BatchQueue,
21    error::{ConcurrencyStatus, RejectionReason},
22};
23
24mod balanced;
25mod duration;
26mod immediate;
27mod size;
28
29#[cfg(test)]
30mod test_utils;
31
32/// A policy controlling when batches get processed.
33#[derive(Debug, Clone)]
34#[non_exhaustive]
35pub enum BatchingPolicy {
36    /// Immediately process the batch if possible.
37    ///
38    /// When concurrency and resources are available, new items will be processed immediately (with
39    /// a batch size of one).
40    ///
41    /// When resources are not immediately available, then the batch will remain open while
42    /// acquiring resources  to allow more items to be added, up to the maximum batch size.
43    ///
44    /// In this way, we try to prioritise larger batch sizes, while still keeping latency low.
45    ///
46    /// When concurrency is maximised, new items will added to the next batch (up to the maximum
47    /// batch size). As soon as a batch finishes the next batch will start. When concurrency is
48    /// limited to 1, it will run batches serially.
49    ///
50    /// Prioritises low latency.
51    Immediate,
52
53    /// Process the batch when it reaches the maximum size.
54    ///
55    /// Prioritises high batch utilisation.
56    Size,
57
58    /// Process the batch a given duration after it was created.
59    ///
60    /// If using `OnFull::Process`, then process the batch when either the duration elapses or the
61    /// batch becomes full, whichever happens first.
62    ///
63    /// Prioritises regularity.
64    Duration(Duration, OnFull),
65
66    /// Balance between resource efficiency and latency based on system load.
67    ///
68    /// When no batches are processing, the first item processes immediately.
69    ///
70    /// When batches are already processing:
71    /// - If batch size < `min_size_hint`: Wait for either the batch to reach `min_size_hint` or
72    ///   any batch to complete
73    /// - If batch size >= `min_size_hint`: Start acquiring resources and process immediately
74    ///
75    /// The `min_size_hint` must be <= `max_batch_size`.
76    ///
77    /// Prioritises efficient resource usage while maintaining reasonable latency.
78    Balanced {
79        /// The minimum batch size to prefer before using additional concurrency.
80        min_size_hint: usize,
81    },
82}
83
84/// What to do when a batch becomes full.
85#[derive(Debug, Clone, Copy)]
86#[non_exhaustive]
87pub enum OnFull {
88    /// Immediately attempt process the batch. If the maximum concurrency has been reached for the
89    /// key, it will reject.
90    Process,
91
92    /// Reject any additional items. The batch will be processed when another condition is reached.
93    Reject,
94}
95
96/// Action to take when adding an item to a batch.
97#[derive(Debug)]
98pub(crate) enum OnAdd {
99    AddAndProcess,
100    AddAndAcquireResources,
101    AddAndProcessAfter(Duration),
102    Reject(RejectionReason),
103    Add,
104}
105
106/// Action to take when a specific generation times out or acquires resources.
107#[derive(Debug)]
108pub(crate) enum OnGenerationEvent {
109    Process,
110    DoNothing,
111}
112
113/// Action to take when a batch finishes processing.
114#[derive(Debug)]
115pub(crate) enum OnFinish {
116    ProcessNext,
117    ProcessNextReady,
118    DoNothing,
119}
120
121impl Display for BatchingPolicy {
122    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123        match self {
124            BatchingPolicy::Immediate => write!(f, "Immediate"),
125            BatchingPolicy::Size => write!(f, "Size"),
126            BatchingPolicy::Duration(duration, on_full) => {
127                write!(f, "Duration({}ms, {:?})", duration.as_millis(), on_full)
128            }
129            BatchingPolicy::Balanced { min_size_hint } => {
130                write!(f, "Balanced(min_size: {})", min_size_hint)
131            }
132        }
133    }
134}
135
136impl BatchingPolicy {
137    /// Normalise the policy to ensure it's valid for the given limits.
138    pub(crate) fn normalise(self, limits: Limits) -> Self {
139        match self {
140            BatchingPolicy::Balanced { min_size_hint } => BatchingPolicy::Balanced {
141                min_size_hint: min_size_hint.min(limits.max_batch_size),
142            },
143            other => other,
144        }
145    }
146
147    /// Should be applied _before_ adding the new item to the batch.
148    pub(crate) fn on_add<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> OnAdd {
149        if let Some(rejection) = self.should_reject(batch_queue) {
150            return OnAdd::Reject(rejection);
151        }
152
153        match self {
154            Self::Immediate => immediate::on_add(batch_queue),
155            Self::Size => size::on_add(batch_queue),
156            Self::Duration(dur, on_full) => duration::on_add(*dur, *on_full, batch_queue),
157            Self::Balanced { min_size_hint } => balanced::on_add(*min_size_hint, batch_queue),
158        }
159    }
160
161    /// Check if the item should be rejected due to capacity constraints.
162    fn should_reject<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> Option<RejectionReason> {
163        if batch_queue.is_full() {
164            if batch_queue.at_max_total_processing_capacity() {
165                Some(RejectionReason::BatchQueueFull(ConcurrencyStatus::MaxedOut))
166            } else {
167                Some(RejectionReason::BatchQueueFull(
168                    ConcurrencyStatus::Available,
169                ))
170            }
171        } else {
172            None
173        }
174    }
175
176    pub(crate) fn on_timeout<P: Processor>(
177        &self,
178        generation: Generation,
179        batch_queue: &BatchQueue<P>,
180    ) -> OnGenerationEvent {
181        if batch_queue.at_max_total_processing_capacity() {
182            OnGenerationEvent::DoNothing
183        } else {
184            Self::process_generation_if_ready(generation, batch_queue)
185        }
186    }
187
188    pub(crate) fn on_resources_acquired<P: Processor>(
189        &self,
190        generation: Generation,
191        batch_queue: &BatchQueue<P>,
192    ) -> OnGenerationEvent {
193        if batch_queue.at_max_total_processing_capacity() {
194            warn!("on_resources_acquired called when at max processing capacity");
195            debug_assert!(
196                false,
197                "on_resources_acquired called when at max processing capacity"
198            );
199            OnGenerationEvent::DoNothing
200        } else {
201            Self::process_generation_if_ready(generation, batch_queue)
202        }
203    }
204
205    pub(crate) fn on_finish<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> OnFinish {
206        if batch_queue.at_max_total_processing_capacity() {
207            warn!("on_finish called when at max processing capacity");
208            debug_assert!(false, "on_finish called when at max processing capacity");
209            return OnFinish::DoNothing;
210        }
211        match self {
212            BatchingPolicy::Immediate => immediate::on_finish(batch_queue),
213            BatchingPolicy::Size => size::on_finish(batch_queue),
214            BatchingPolicy::Duration(_, _) => duration::on_finish(batch_queue),
215            BatchingPolicy::Balanced { .. } => balanced::on_finish(batch_queue),
216        }
217    }
218
219    fn process_generation_if_ready<P: Processor>(
220        generation: Generation,
221        batch_queue: &BatchQueue<P>,
222    ) -> OnGenerationEvent {
223        if batch_queue.is_generation_ready(generation) {
224            OnGenerationEvent::Process
225        } else {
226            OnGenerationEvent::DoNothing
227        }
228    }
229}