1pub use state::{CapacityError, CapacityTracker, ConsumerCapacity};
2
3mod state {
4 use crate::pressure::signal::PressureSignal;
5
6 #[derive(Clone, Debug, PartialEq, Eq)]
8 pub struct ConsumerCapacity {
9 pub max_in_flight: usize,
11 pub max_buffer_depth: usize,
13 }
14
15 impl ConsumerCapacity {
16 pub const fn new(
22 max_in_flight: usize,
23 max_buffer_depth: usize,
24 ) -> Result<Self, CapacityError> {
25 if max_in_flight == 0 || max_buffer_depth == 0 {
26 Err(CapacityError::InvalidCapacity {
27 max_in_flight,
28 max_buffer_depth,
29 })
30 } else {
31 Ok(Self {
32 max_in_flight,
33 max_buffer_depth,
34 })
35 }
36 }
37
38 pub const fn validate(&self) -> Result<(), CapacityError> {
44 if self.max_in_flight == 0 || self.max_buffer_depth == 0 {
45 Err(CapacityError::InvalidCapacity {
46 max_in_flight: self.max_in_flight,
47 max_buffer_depth: self.max_buffer_depth,
48 })
49 } else {
50 Ok(())
51 }
52 }
53 }
54
55 #[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
57 pub enum CapacityError {
58 #[error("consumer capacity limits must be positive")]
60 InvalidCapacity {
61 max_in_flight: usize,
63 max_buffer_depth: usize,
65 },
66 #[error("cannot decrement in-flight count below zero")]
68 InFlightUnderflow,
69 #[error("cannot decrement buffer depth below zero")]
71 BufferUnderflow,
72 }
73
74 #[derive(Clone, Debug, PartialEq, Eq)]
76 pub struct CapacityTracker {
77 capacity: ConsumerCapacity,
78 current_in_flight: usize,
79 current_buffer_depth: usize,
80 }
81
82 impl CapacityTracker {
83 #[must_use]
85 pub const fn new(capacity: ConsumerCapacity) -> Self {
86 Self {
87 capacity,
88 current_in_flight: 0,
89 current_buffer_depth: 0,
90 }
91 }
92
93 #[must_use]
95 pub const fn capacity(&self) -> &ConsumerCapacity {
96 &self.capacity
97 }
98
99 #[must_use]
101 pub const fn current_in_flight(&self) -> usize {
102 self.current_in_flight
103 }
104
105 #[must_use]
107 pub const fn current_buffer_depth(&self) -> usize {
108 self.current_buffer_depth
109 }
110
111 pub const fn record_delivery(&mut self) {
113 if self.current_in_flight < usize::MAX {
114 self.current_in_flight += 1;
115 }
116 }
117
118 pub const fn record_completion(&mut self) -> Result<(), CapacityError> {
124 if self.current_in_flight == 0 {
125 Err(CapacityError::InFlightUnderflow)
126 } else {
127 self.current_in_flight -= 1;
128 Ok(())
129 }
130 }
131
132 pub const fn record_buffered(&mut self) {
134 if self.current_buffer_depth < usize::MAX {
135 self.current_buffer_depth += 1;
136 }
137 }
138
139 pub const fn record_buffer_drained(&mut self) -> Result<(), CapacityError> {
145 if self.current_buffer_depth == 0 {
146 Err(CapacityError::BufferUnderflow)
147 } else {
148 self.current_buffer_depth -= 1;
149 Ok(())
150 }
151 }
152
153 #[must_use]
155 pub const fn pressure_signal(&self) -> PressureSignal {
156 if self.current_in_flight < self.capacity.max_in_flight {
157 PressureSignal::accept(self.current_in_flight, self.capacity.max_in_flight)
158 } else if self.current_buffer_depth < self.capacity.max_buffer_depth {
159 PressureSignal::defer(
160 self.current_in_flight,
161 self.capacity.max_in_flight,
162 self.current_buffer_depth,
163 self.capacity.max_buffer_depth,
164 )
165 } else {
166 PressureSignal::reject(
167 self.current_in_flight,
168 self.capacity.max_in_flight,
169 self.current_buffer_depth,
170 self.capacity.max_buffer_depth,
171 )
172 }
173 }
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use super::{CapacityError, CapacityTracker, ConsumerCapacity};
180 use crate::pressure::PressureSignal;
181
182 const fn capacity(max_in_flight: usize, max_buffer_depth: usize) -> ConsumerCapacity {
183 ConsumerCapacity {
184 max_in_flight,
185 max_buffer_depth,
186 }
187 }
188
189 #[test]
190 fn consumer_capacity_constructs_with_public_fields_and_validates_positive_limits() {
191 let declaration = ConsumerCapacity {
192 max_in_flight: 10,
193 max_buffer_depth: 50,
194 };
195
196 assert_eq!(declaration.max_in_flight, 10);
197 assert_eq!(declaration.max_buffer_depth, 50);
198 assert_eq!(declaration.validate(), Ok(()));
199 assert_eq!(ConsumerCapacity::new(10, 50), Ok(declaration));
200 assert_eq!(
201 ConsumerCapacity::new(0, 50),
202 Err(CapacityError::InvalidCapacity {
203 max_in_flight: 0,
204 max_buffer_depth: 50,
205 })
206 );
207 }
208
209 #[test]
210 fn capacity_tracker_starts_empty_and_records_counts() {
211 let mut tracker = CapacityTracker::new(capacity(10, 50));
212
213 assert_eq!(tracker.current_in_flight(), 0);
214 assert_eq!(tracker.current_buffer_depth(), 0);
215 assert_eq!(tracker.capacity(), &capacity(10, 50));
216
217 tracker.record_delivery();
218 assert_eq!(tracker.current_in_flight(), 1);
219
220 assert_eq!(tracker.record_completion(), Ok(()));
221 assert_eq!(tracker.current_in_flight(), 0);
222
223 tracker.record_buffered();
224 assert_eq!(tracker.current_buffer_depth(), 1);
225
226 assert_eq!(tracker.record_buffer_drained(), Ok(()));
227 assert_eq!(tracker.current_buffer_depth(), 0);
228 }
229
230 #[test]
231 fn capacity_tracker_reports_underflow_errors_without_negative_counts() {
232 let mut tracker = CapacityTracker::new(capacity(10, 50));
233
234 assert_eq!(
235 tracker.record_completion(),
236 Err(CapacityError::InFlightUnderflow)
237 );
238 assert_eq!(tracker.current_in_flight(), 0);
239
240 assert_eq!(
241 tracker.record_buffer_drained(),
242 Err(CapacityError::BufferUnderflow)
243 );
244 assert_eq!(tracker.current_buffer_depth(), 0);
245 }
246
247 #[test]
248 fn pressure_signal_accepts_when_in_flight_capacity_is_available() {
249 let mut tracker = CapacityTracker::new(capacity(2, 5));
250 tracker.record_delivery();
251
252 assert_eq!(tracker.pressure_signal(), PressureSignal::accept(1, 2));
253 assert_eq!(tracker.current_in_flight(), 1);
254 assert_eq!(tracker.current_buffer_depth(), 0);
255 }
256
257 #[test]
258 fn pressure_signal_defers_when_in_flight_full_and_buffer_has_capacity() {
259 let mut tracker = CapacityTracker::new(capacity(2, 5));
260 tracker.record_delivery();
261 tracker.record_delivery();
262 tracker.record_buffered();
263 tracker.record_buffered();
264 tracker.record_buffered();
265
266 assert_eq!(tracker.pressure_signal(), PressureSignal::defer(2, 2, 3, 5));
267 assert_eq!(tracker.current_in_flight(), 2);
268 assert_eq!(tracker.current_buffer_depth(), 3);
269 }
270
271 #[test]
272 fn pressure_signal_rejects_when_in_flight_and_buffer_limits_are_reached() {
273 let mut tracker = CapacityTracker::new(capacity(2, 5));
274 tracker.record_delivery();
275 tracker.record_delivery();
276 tracker.record_buffered();
277 tracker.record_buffered();
278 tracker.record_buffered();
279 tracker.record_buffered();
280 tracker.record_buffered();
281
282 assert_eq!(
283 tracker.pressure_signal(),
284 PressureSignal::reject(2, 2, 5, 5)
285 );
286 assert_eq!(tracker.current_in_flight(), 2);
287 assert_eq!(tracker.current_buffer_depth(), 5);
288 }
289
290 #[test]
291 fn pressure_signal_accepts_available_in_flight_regardless_of_buffer_state() {
292 let mut tracker = CapacityTracker::new(capacity(1, 1));
293 tracker.record_buffered();
294
295 assert_eq!(tracker.pressure_signal(), PressureSignal::accept(0, 1));
296 assert_eq!(tracker.current_in_flight(), 0);
297 assert_eq!(tracker.current_buffer_depth(), 1);
298 }
299
300 #[test]
301 fn pressure_root_re_exports_capacity_types() {
302 use crate::pressure::{
303 CapacityError as RootCapacityError, CapacityTracker as RootCapacityTracker,
304 ConsumerCapacity as RootConsumerCapacity,
305 };
306
307 let mut tracker = RootCapacityTracker::new(RootConsumerCapacity {
308 max_in_flight: 1,
309 max_buffer_depth: 1,
310 });
311
312 assert_eq!(
313 tracker.record_completion(),
314 Err(RootCapacityError::InFlightUnderflow)
315 );
316 }
317}