batch_aint_one/policies/
mod.rs1use std::{
11 fmt::{self, Debug, Display},
12 time::Duration,
13};
14
15use tracing::warn;
16
17use crate::{
18 Limits, Processor,
19 batch_inner::Generation,
20 batch_queue::BatchQueue,
21 error::{ConcurrencyStatus, RejectionReason},
22};
23
24mod balanced;
25mod duration;
26mod immediate;
27mod size;
28
29#[cfg(test)]
30mod test_utils;
31
32#[derive(Debug, Clone)]
34#[non_exhaustive]
35pub enum BatchingPolicy {
36 Immediate,
52
53 Size,
57
58 Duration(Duration, OnFull),
65
66 Balanced {
79 min_size_hint: usize,
81 },
82}
83
84#[derive(Debug, Clone, Copy)]
86#[non_exhaustive]
87pub enum OnFull {
88 Process,
91
92 Reject,
94}
95
96#[derive(Debug)]
98pub(crate) enum OnAdd {
99 AddAndProcess,
100 AddAndAcquireResources,
101 AddAndProcessAfter(Duration),
102 Reject(RejectionReason),
103 Add,
104}
105
106#[derive(Debug)]
108pub(crate) enum OnGenerationEvent {
109 Process,
110 DoNothing,
111}
112
113#[derive(Debug)]
115pub(crate) enum OnFinish {
116 ProcessNext,
117 ProcessNextReady,
118 DoNothing,
119}
120
121impl Display for BatchingPolicy {
122 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123 match self {
124 BatchingPolicy::Immediate => write!(f, "Immediate"),
125 BatchingPolicy::Size => write!(f, "Size"),
126 BatchingPolicy::Duration(duration, on_full) => {
127 write!(f, "Duration({}ms, {:?})", duration.as_millis(), on_full)
128 }
129 BatchingPolicy::Balanced { min_size_hint } => {
130 write!(f, "Balanced(min_size: {})", min_size_hint)
131 }
132 }
133 }
134}
135
136impl BatchingPolicy {
137 pub(crate) fn normalise(self, limits: Limits) -> Self {
139 match self {
140 BatchingPolicy::Balanced { min_size_hint } => BatchingPolicy::Balanced {
141 min_size_hint: min_size_hint.min(limits.max_batch_size),
142 },
143 other => other,
144 }
145 }
146
147 pub(crate) fn on_add<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> OnAdd {
149 if let Some(rejection) = self.should_reject(batch_queue) {
150 return OnAdd::Reject(rejection);
151 }
152
153 match self {
154 Self::Immediate => immediate::on_add(batch_queue),
155 Self::Size => size::on_add(batch_queue),
156 Self::Duration(dur, on_full) => duration::on_add(*dur, *on_full, batch_queue),
157 Self::Balanced { min_size_hint } => balanced::on_add(*min_size_hint, batch_queue),
158 }
159 }
160
161 fn should_reject<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> Option<RejectionReason> {
163 if batch_queue.is_full() {
164 if batch_queue.at_max_total_processing_capacity() {
165 Some(RejectionReason::BatchQueueFull(ConcurrencyStatus::MaxedOut))
166 } else {
167 Some(RejectionReason::BatchQueueFull(
168 ConcurrencyStatus::Available,
169 ))
170 }
171 } else {
172 None
173 }
174 }
175
176 pub(crate) fn on_timeout<P: Processor>(
177 &self,
178 generation: Generation,
179 batch_queue: &BatchQueue<P>,
180 ) -> OnGenerationEvent {
181 if batch_queue.at_max_total_processing_capacity() {
182 OnGenerationEvent::DoNothing
183 } else {
184 Self::process_generation_if_ready(generation, batch_queue)
185 }
186 }
187
188 pub(crate) fn on_resources_acquired<P: Processor>(
189 &self,
190 generation: Generation,
191 batch_queue: &BatchQueue<P>,
192 ) -> OnGenerationEvent {
193 if batch_queue.at_max_total_processing_capacity() {
194 warn!("on_resources_acquired called when at max processing capacity");
195 debug_assert!(
196 false,
197 "on_resources_acquired called when at max processing capacity"
198 );
199 OnGenerationEvent::DoNothing
200 } else {
201 Self::process_generation_if_ready(generation, batch_queue)
202 }
203 }
204
205 pub(crate) fn on_finish<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> OnFinish {
206 if batch_queue.at_max_total_processing_capacity() {
207 warn!("on_finish called when at max processing capacity");
208 debug_assert!(false, "on_finish called when at max processing capacity");
209 return OnFinish::DoNothing;
210 }
211 match self {
212 BatchingPolicy::Immediate => immediate::on_finish(batch_queue),
213 BatchingPolicy::Size => size::on_finish(batch_queue),
214 BatchingPolicy::Duration(_, _) => duration::on_finish(batch_queue),
215 BatchingPolicy::Balanced { .. } => balanced::on_finish(batch_queue),
216 }
217 }
218
219 fn process_generation_if_ready<P: Processor>(
220 generation: Generation,
221 batch_queue: &BatchQueue<P>,
222 ) -> OnGenerationEvent {
223 if batch_queue.is_generation_ready(generation) {
224 OnGenerationEvent::Process
225 } else {
226 OnGenerationEvent::DoNothing
227 }
228 }
229}