batch_aint_one/policies/
mod.rs1use std::{
4 fmt::{self, Debug, Display},
5 time::Duration,
6};
7
8use crate::{
9 Limits, Processor,
10 batch_inner::Generation,
11 batch_queue::BatchQueue,
12 error::{ConcurrencyStatus, RejectionReason},
13};
14
15mod balanced;
16mod duration;
17mod immediate;
18mod size;
19
20#[cfg(test)]
21mod test_utils;
22
23#[derive(Debug, Clone)]
32#[non_exhaustive]
33pub enum BatchingPolicy {
34 Immediate,
50
51 Size,
55
56 Duration(Duration, OnFull),
63
64 Balanced {
77 min_size_hint: usize,
79 },
80}
81
82#[derive(Debug, Clone, Copy)]
84#[non_exhaustive]
85pub enum OnFull {
86 Process,
89
90 Reject,
92}
93
94#[derive(Debug)]
96pub(crate) enum OnAdd {
97 AddAndProcess,
98 AddAndAcquireResources,
99 AddAndProcessAfter(Duration),
100 Reject(RejectionReason),
101 Add,
102}
103
104#[derive(Debug)]
106pub(crate) enum OnGenerationEvent {
107 Process,
108 DoNothing,
109}
110
111#[derive(Debug)]
113pub(crate) enum OnFinish {
114 ProcessNext,
115 ProcessNextReady,
116 DoNothing,
117}
118
119impl Display for BatchingPolicy {
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121 match self {
122 BatchingPolicy::Immediate => write!(f, "Immediate"),
123 BatchingPolicy::Size => write!(f, "Size"),
124 BatchingPolicy::Duration(duration, on_full) => {
125 write!(f, "Duration({}ms, {:?})", duration.as_millis(), on_full)
126 }
127 BatchingPolicy::Balanced { min_size_hint } => {
128 write!(f, "Balanced(min_size: {})", min_size_hint)
129 }
130 }
131 }
132}
133
134impl BatchingPolicy {
135 pub(crate) fn normalise(self, limits: Limits) -> Self {
137 match self {
138 BatchingPolicy::Balanced { min_size_hint } => BatchingPolicy::Balanced {
139 min_size_hint: min_size_hint.min(limits.max_batch_size),
140 },
141 other => other,
142 }
143 }
144
145 pub(crate) fn on_add<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> OnAdd {
147 if let Some(rejection) = self.should_reject(batch_queue) {
148 return OnAdd::Reject(rejection);
149 }
150
151 match self {
152 Self::Immediate => immediate::on_add(batch_queue),
153 Self::Size => size::on_add(batch_queue),
154 Self::Duration(dur, on_full) => duration::on_add(*dur, *on_full, batch_queue),
155 Self::Balanced { min_size_hint } => balanced::on_add(*min_size_hint, batch_queue),
156 }
157 }
158
159 fn should_reject<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> Option<RejectionReason> {
161 if batch_queue.is_full() {
162 if batch_queue.at_max_total_processing_capacity() {
163 Some(RejectionReason::BatchQueueFull(ConcurrencyStatus::MaxedOut))
164 } else {
165 Some(RejectionReason::BatchQueueFull(
166 ConcurrencyStatus::Available,
167 ))
168 }
169 } else {
170 None
171 }
172 }
173
174 pub(crate) fn on_timeout<P: Processor>(
175 &self,
176 generation: Generation,
177 batch_queue: &BatchQueue<P>,
178 ) -> OnGenerationEvent {
179 if batch_queue.at_max_total_processing_capacity() {
180 OnGenerationEvent::DoNothing
181 } else {
182 Self::process_generation_if_ready(generation, batch_queue)
183 }
184 }
185
186 pub(crate) fn on_resources_acquired<P: Processor>(
187 &self,
188 generation: Generation,
189 batch_queue: &BatchQueue<P>,
190 ) -> OnGenerationEvent {
191 if batch_queue.at_max_total_processing_capacity() {
192 soft_assert!(
193 false,
194 "on_resources_acquired called when at max processing capacity"
195 );
196 OnGenerationEvent::DoNothing
197 } else {
198 Self::process_generation_if_ready(generation, batch_queue)
199 }
200 }
201
202 pub(crate) fn on_finish<P: Processor>(&self, batch_queue: &BatchQueue<P>) -> OnFinish {
203 if batch_queue.at_max_total_processing_capacity() {
204 soft_assert!(false, "on_finish called when at max processing capacity");
205 return OnFinish::DoNothing;
206 }
207 match self {
208 BatchingPolicy::Immediate => immediate::on_finish(batch_queue),
209 BatchingPolicy::Size => size::on_finish(batch_queue),
210 BatchingPolicy::Duration(_, _) => duration::on_finish(batch_queue),
211 BatchingPolicy::Balanced { .. } => balanced::on_finish(batch_queue),
212 }
213 }
214
215 fn process_generation_if_ready<P: Processor>(
216 generation: Generation,
217 batch_queue: &BatchQueue<P>,
218 ) -> OnGenerationEvent {
219 if batch_queue.is_generation_ready(generation) {
220 OnGenerationEvent::Process
221 } else {
222 OnGenerationEvent::DoNothing
223 }
224 }
225}