Skip to main content

liminal/pressure/
capacity.rs

1pub use state::{CapacityError, CapacityTracker, ConsumerCapacity};
2
3mod state {
4    use crate::pressure::signal::PressureSignal;
5
6    /// Consumer-declared capacity limits for pressure-aware delivery.
7    #[derive(Clone, Debug, PartialEq, Eq)]
8    pub struct ConsumerCapacity {
9        /// Maximum messages this consumer can process concurrently.
10        pub max_in_flight: usize,
11        /// Maximum messages that may wait for this consumer's capacity to free.
12        pub max_buffer_depth: usize,
13    }
14
15    impl ConsumerCapacity {
16        /// Creates a capacity declaration after verifying both limits are positive.
17        ///
18        /// # Errors
19        ///
20        /// Returns [`CapacityError::InvalidCapacity`] when either declared limit is zero.
21        pub const fn new(
22            max_in_flight: usize,
23            max_buffer_depth: usize,
24        ) -> Result<Self, CapacityError> {
25            if max_in_flight == 0 || max_buffer_depth == 0 {
26                Err(CapacityError::InvalidCapacity {
27                    max_in_flight,
28                    max_buffer_depth,
29                })
30            } else {
31                Ok(Self {
32                    max_in_flight,
33                    max_buffer_depth,
34                })
35            }
36        }
37
38        /// Verifies that the declared capacity contains positive limits.
39        ///
40        /// # Errors
41        ///
42        /// Returns [`CapacityError::InvalidCapacity`] when either declared limit is zero.
43        pub const fn validate(&self) -> Result<(), CapacityError> {
44            if self.max_in_flight == 0 || self.max_buffer_depth == 0 {
45                Err(CapacityError::InvalidCapacity {
46                    max_in_flight: self.max_in_flight,
47                    max_buffer_depth: self.max_buffer_depth,
48                })
49            } else {
50                Ok(())
51            }
52        }
53    }
54
55    /// Capacity tracking failures that keep counters from entering invalid states.
56    #[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
57    pub enum CapacityError {
58        /// A capacity declaration used zero for at least one required positive limit.
59        #[error("consumer capacity limits must be positive")]
60        InvalidCapacity {
61            /// Declared maximum in-flight messages.
62            max_in_flight: usize,
63            /// Declared maximum buffered messages.
64            max_buffer_depth: usize,
65        },
66        /// Processing completion was recorded while no message was in flight.
67        #[error("cannot decrement in-flight count below zero")]
68        InFlightUnderflow,
69        /// Buffer removal was recorded while no message was buffered.
70        #[error("cannot decrement buffer depth below zero")]
71        BufferUnderflow,
72    }
73
74    /// Per-consumer tracker for current in-flight and buffered message counts.
75    #[derive(Clone, Debug, PartialEq, Eq)]
76    pub struct CapacityTracker {
77        capacity: ConsumerCapacity,
78        current_in_flight: usize,
79        current_buffer_depth: usize,
80    }
81
82    impl CapacityTracker {
83        /// Creates a tracker for an explicitly declared consumer capacity.
84        #[must_use]
85        pub const fn new(capacity: ConsumerCapacity) -> Self {
86            Self {
87                capacity,
88                current_in_flight: 0,
89                current_buffer_depth: 0,
90            }
91        }
92
93        /// Returns the consumer capacity declaration this tracker follows.
94        #[must_use]
95        pub const fn capacity(&self) -> &ConsumerCapacity {
96            &self.capacity
97        }
98
99        /// Returns the number of messages currently being processed by the consumer.
100        #[must_use]
101        pub const fn current_in_flight(&self) -> usize {
102            self.current_in_flight
103        }
104
105        /// Returns the number of messages currently buffered for the consumer.
106        #[must_use]
107        pub const fn current_buffer_depth(&self) -> usize {
108            self.current_buffer_depth
109        }
110
111        /// Records that a message was delivered and processing began.
112        pub const fn record_delivery(&mut self) {
113            if self.current_in_flight < usize::MAX {
114                self.current_in_flight += 1;
115            }
116        }
117
118        /// Records that processing completed for one in-flight message.
119        ///
120        /// # Errors
121        ///
122        /// Returns [`CapacityError::InFlightUnderflow`] if no message is currently in flight.
123        pub const fn record_completion(&mut self) -> Result<(), CapacityError> {
124            if self.current_in_flight == 0 {
125                Err(CapacityError::InFlightUnderflow)
126            } else {
127                self.current_in_flight -= 1;
128                Ok(())
129            }
130        }
131
132        /// Records that a message was buffered pending consumer capacity.
133        pub const fn record_buffered(&mut self) {
134            if self.current_buffer_depth < usize::MAX {
135                self.current_buffer_depth += 1;
136            }
137        }
138
139        /// Records that one buffered message left the buffer.
140        ///
141        /// # Errors
142        ///
143        /// Returns [`CapacityError::BufferUnderflow`] if no message is currently buffered.
144        pub const fn record_buffer_drained(&mut self) -> Result<(), CapacityError> {
145            if self.current_buffer_depth == 0 {
146                Err(CapacityError::BufferUnderflow)
147            } else {
148                self.current_buffer_depth -= 1;
149                Ok(())
150            }
151        }
152
153        /// Determines the pressure signal for the next message without mutating counters.
154        #[must_use]
155        pub const fn pressure_signal(&self) -> PressureSignal {
156            if self.current_in_flight < self.capacity.max_in_flight {
157                PressureSignal::accept(self.current_in_flight, self.capacity.max_in_flight)
158            } else if self.current_buffer_depth < self.capacity.max_buffer_depth {
159                PressureSignal::defer(
160                    self.current_in_flight,
161                    self.capacity.max_in_flight,
162                    self.current_buffer_depth,
163                    self.capacity.max_buffer_depth,
164                )
165            } else {
166                PressureSignal::reject(
167                    self.current_in_flight,
168                    self.capacity.max_in_flight,
169                    self.current_buffer_depth,
170                    self.capacity.max_buffer_depth,
171                )
172            }
173        }
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use super::{CapacityError, CapacityTracker, ConsumerCapacity};
180    use crate::pressure::PressureSignal;
181
182    const fn capacity(max_in_flight: usize, max_buffer_depth: usize) -> ConsumerCapacity {
183        ConsumerCapacity {
184            max_in_flight,
185            max_buffer_depth,
186        }
187    }
188
189    #[test]
190    fn consumer_capacity_constructs_with_public_fields_and_validates_positive_limits() {
191        let declaration = ConsumerCapacity {
192            max_in_flight: 10,
193            max_buffer_depth: 50,
194        };
195
196        assert_eq!(declaration.max_in_flight, 10);
197        assert_eq!(declaration.max_buffer_depth, 50);
198        assert_eq!(declaration.validate(), Ok(()));
199        assert_eq!(ConsumerCapacity::new(10, 50), Ok(declaration));
200        assert_eq!(
201            ConsumerCapacity::new(0, 50),
202            Err(CapacityError::InvalidCapacity {
203                max_in_flight: 0,
204                max_buffer_depth: 50,
205            })
206        );
207    }
208
209    #[test]
210    fn capacity_tracker_starts_empty_and_records_counts() {
211        let mut tracker = CapacityTracker::new(capacity(10, 50));
212
213        assert_eq!(tracker.current_in_flight(), 0);
214        assert_eq!(tracker.current_buffer_depth(), 0);
215        assert_eq!(tracker.capacity(), &capacity(10, 50));
216
217        tracker.record_delivery();
218        assert_eq!(tracker.current_in_flight(), 1);
219
220        assert_eq!(tracker.record_completion(), Ok(()));
221        assert_eq!(tracker.current_in_flight(), 0);
222
223        tracker.record_buffered();
224        assert_eq!(tracker.current_buffer_depth(), 1);
225
226        assert_eq!(tracker.record_buffer_drained(), Ok(()));
227        assert_eq!(tracker.current_buffer_depth(), 0);
228    }
229
230    #[test]
231    fn capacity_tracker_reports_underflow_errors_without_negative_counts() {
232        let mut tracker = CapacityTracker::new(capacity(10, 50));
233
234        assert_eq!(
235            tracker.record_completion(),
236            Err(CapacityError::InFlightUnderflow)
237        );
238        assert_eq!(tracker.current_in_flight(), 0);
239
240        assert_eq!(
241            tracker.record_buffer_drained(),
242            Err(CapacityError::BufferUnderflow)
243        );
244        assert_eq!(tracker.current_buffer_depth(), 0);
245    }
246
247    #[test]
248    fn pressure_signal_accepts_when_in_flight_capacity_is_available() {
249        let mut tracker = CapacityTracker::new(capacity(2, 5));
250        tracker.record_delivery();
251
252        assert_eq!(tracker.pressure_signal(), PressureSignal::accept(1, 2));
253        assert_eq!(tracker.current_in_flight(), 1);
254        assert_eq!(tracker.current_buffer_depth(), 0);
255    }
256
257    #[test]
258    fn pressure_signal_defers_when_in_flight_full_and_buffer_has_capacity() {
259        let mut tracker = CapacityTracker::new(capacity(2, 5));
260        tracker.record_delivery();
261        tracker.record_delivery();
262        tracker.record_buffered();
263        tracker.record_buffered();
264        tracker.record_buffered();
265
266        assert_eq!(tracker.pressure_signal(), PressureSignal::defer(2, 2, 3, 5));
267        assert_eq!(tracker.current_in_flight(), 2);
268        assert_eq!(tracker.current_buffer_depth(), 3);
269    }
270
271    #[test]
272    fn pressure_signal_rejects_when_in_flight_and_buffer_limits_are_reached() {
273        let mut tracker = CapacityTracker::new(capacity(2, 5));
274        tracker.record_delivery();
275        tracker.record_delivery();
276        tracker.record_buffered();
277        tracker.record_buffered();
278        tracker.record_buffered();
279        tracker.record_buffered();
280        tracker.record_buffered();
281
282        assert_eq!(
283            tracker.pressure_signal(),
284            PressureSignal::reject(2, 2, 5, 5)
285        );
286        assert_eq!(tracker.current_in_flight(), 2);
287        assert_eq!(tracker.current_buffer_depth(), 5);
288    }
289
290    #[test]
291    fn pressure_signal_accepts_available_in_flight_regardless_of_buffer_state() {
292        let mut tracker = CapacityTracker::new(capacity(1, 1));
293        tracker.record_buffered();
294
295        assert_eq!(tracker.pressure_signal(), PressureSignal::accept(0, 1));
296        assert_eq!(tracker.current_in_flight(), 0);
297        assert_eq!(tracker.current_buffer_depth(), 1);
298    }
299
300    #[test]
301    fn pressure_root_re_exports_capacity_types() {
302        use crate::pressure::{
303            CapacityError as RootCapacityError, CapacityTracker as RootCapacityTracker,
304            ConsumerCapacity as RootConsumerCapacity,
305        };
306
307        let mut tracker = RootCapacityTracker::new(RootConsumerCapacity {
308            max_in_flight: 1,
309            max_buffer_depth: 1,
310        });
311
312        assert_eq!(
313            tracker.record_completion(),
314            Err(RootCapacityError::InFlightUnderflow)
315        );
316    }
317}