pub use state::{CapacityError, CapacityTracker, ConsumerCapacity};
mod state {
use crate::pressure::signal::PressureSignal;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ConsumerCapacity {
pub max_in_flight: usize,
pub max_buffer_depth: usize,
}
impl ConsumerCapacity {
pub const fn new(
max_in_flight: usize,
max_buffer_depth: usize,
) -> Result<Self, CapacityError> {
if max_in_flight == 0 || max_buffer_depth == 0 {
Err(CapacityError::InvalidCapacity {
max_in_flight,
max_buffer_depth,
})
} else {
Ok(Self {
max_in_flight,
max_buffer_depth,
})
}
}
pub const fn validate(&self) -> Result<(), CapacityError> {
if self.max_in_flight == 0 || self.max_buffer_depth == 0 {
Err(CapacityError::InvalidCapacity {
max_in_flight: self.max_in_flight,
max_buffer_depth: self.max_buffer_depth,
})
} else {
Ok(())
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
pub enum CapacityError {
#[error("consumer capacity limits must be positive")]
InvalidCapacity {
max_in_flight: usize,
max_buffer_depth: usize,
},
#[error("cannot decrement in-flight count below zero")]
InFlightUnderflow,
#[error("cannot decrement buffer depth below zero")]
BufferUnderflow,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CapacityTracker {
capacity: ConsumerCapacity,
current_in_flight: usize,
current_buffer_depth: usize,
}
impl CapacityTracker {
#[must_use]
pub const fn new(capacity: ConsumerCapacity) -> Self {
Self {
capacity,
current_in_flight: 0,
current_buffer_depth: 0,
}
}
#[must_use]
pub const fn capacity(&self) -> &ConsumerCapacity {
&self.capacity
}
#[must_use]
pub const fn current_in_flight(&self) -> usize {
self.current_in_flight
}
#[must_use]
pub const fn current_buffer_depth(&self) -> usize {
self.current_buffer_depth
}
pub const fn record_delivery(&mut self) {
if self.current_in_flight < usize::MAX {
self.current_in_flight += 1;
}
}
pub const fn record_completion(&mut self) -> Result<(), CapacityError> {
if self.current_in_flight == 0 {
Err(CapacityError::InFlightUnderflow)
} else {
self.current_in_flight -= 1;
Ok(())
}
}
pub const fn record_buffered(&mut self) {
if self.current_buffer_depth < usize::MAX {
self.current_buffer_depth += 1;
}
}
pub const fn record_buffer_drained(&mut self) -> Result<(), CapacityError> {
if self.current_buffer_depth == 0 {
Err(CapacityError::BufferUnderflow)
} else {
self.current_buffer_depth -= 1;
Ok(())
}
}
#[must_use]
pub const fn pressure_signal(&self) -> PressureSignal {
if self.current_in_flight < self.capacity.max_in_flight {
PressureSignal::accept(self.current_in_flight, self.capacity.max_in_flight)
} else if self.current_buffer_depth < self.capacity.max_buffer_depth {
PressureSignal::defer(
self.current_in_flight,
self.capacity.max_in_flight,
self.current_buffer_depth,
self.capacity.max_buffer_depth,
)
} else {
PressureSignal::reject(
self.current_in_flight,
self.capacity.max_in_flight,
self.current_buffer_depth,
self.capacity.max_buffer_depth,
)
}
}
}
}
#[cfg(test)]
mod tests {
use super::{CapacityError, CapacityTracker, ConsumerCapacity};
use crate::pressure::PressureSignal;
const fn capacity(max_in_flight: usize, max_buffer_depth: usize) -> ConsumerCapacity {
ConsumerCapacity {
max_in_flight,
max_buffer_depth,
}
}
#[test]
fn consumer_capacity_constructs_with_public_fields_and_validates_positive_limits() {
let declaration = ConsumerCapacity {
max_in_flight: 10,
max_buffer_depth: 50,
};
assert_eq!(declaration.max_in_flight, 10);
assert_eq!(declaration.max_buffer_depth, 50);
assert_eq!(declaration.validate(), Ok(()));
assert_eq!(ConsumerCapacity::new(10, 50), Ok(declaration));
assert_eq!(
ConsumerCapacity::new(0, 50),
Err(CapacityError::InvalidCapacity {
max_in_flight: 0,
max_buffer_depth: 50,
})
);
}
#[test]
fn capacity_tracker_starts_empty_and_records_counts() {
let mut tracker = CapacityTracker::new(capacity(10, 50));
assert_eq!(tracker.current_in_flight(), 0);
assert_eq!(tracker.current_buffer_depth(), 0);
assert_eq!(tracker.capacity(), &capacity(10, 50));
tracker.record_delivery();
assert_eq!(tracker.current_in_flight(), 1);
assert_eq!(tracker.record_completion(), Ok(()));
assert_eq!(tracker.current_in_flight(), 0);
tracker.record_buffered();
assert_eq!(tracker.current_buffer_depth(), 1);
assert_eq!(tracker.record_buffer_drained(), Ok(()));
assert_eq!(tracker.current_buffer_depth(), 0);
}
#[test]
fn capacity_tracker_reports_underflow_errors_without_negative_counts() {
let mut tracker = CapacityTracker::new(capacity(10, 50));
assert_eq!(
tracker.record_completion(),
Err(CapacityError::InFlightUnderflow)
);
assert_eq!(tracker.current_in_flight(), 0);
assert_eq!(
tracker.record_buffer_drained(),
Err(CapacityError::BufferUnderflow)
);
assert_eq!(tracker.current_buffer_depth(), 0);
}
#[test]
fn pressure_signal_accepts_when_in_flight_capacity_is_available() {
let mut tracker = CapacityTracker::new(capacity(2, 5));
tracker.record_delivery();
assert_eq!(tracker.pressure_signal(), PressureSignal::accept(1, 2));
assert_eq!(tracker.current_in_flight(), 1);
assert_eq!(tracker.current_buffer_depth(), 0);
}
#[test]
fn pressure_signal_defers_when_in_flight_full_and_buffer_has_capacity() {
let mut tracker = CapacityTracker::new(capacity(2, 5));
tracker.record_delivery();
tracker.record_delivery();
tracker.record_buffered();
tracker.record_buffered();
tracker.record_buffered();
assert_eq!(tracker.pressure_signal(), PressureSignal::defer(2, 2, 3, 5));
assert_eq!(tracker.current_in_flight(), 2);
assert_eq!(tracker.current_buffer_depth(), 3);
}
#[test]
fn pressure_signal_rejects_when_in_flight_and_buffer_limits_are_reached() {
let mut tracker = CapacityTracker::new(capacity(2, 5));
tracker.record_delivery();
tracker.record_delivery();
tracker.record_buffered();
tracker.record_buffered();
tracker.record_buffered();
tracker.record_buffered();
tracker.record_buffered();
assert_eq!(
tracker.pressure_signal(),
PressureSignal::reject(2, 2, 5, 5)
);
assert_eq!(tracker.current_in_flight(), 2);
assert_eq!(tracker.current_buffer_depth(), 5);
}
#[test]
fn pressure_signal_accepts_available_in_flight_regardless_of_buffer_state() {
let mut tracker = CapacityTracker::new(capacity(1, 1));
tracker.record_buffered();
assert_eq!(tracker.pressure_signal(), PressureSignal::accept(0, 1));
assert_eq!(tracker.current_in_flight(), 0);
assert_eq!(tracker.current_buffer_depth(), 1);
}
#[test]
fn pressure_root_re_exports_capacity_types() {
use crate::pressure::{
CapacityError as RootCapacityError, CapacityTracker as RootCapacityTracker,
ConsumerCapacity as RootConsumerCapacity,
};
let mut tracker = RootCapacityTracker::new(RootConsumerCapacity {
max_in_flight: 1,
max_buffer_depth: 1,
});
assert_eq!(
tracker.record_completion(),
Err(RootCapacityError::InFlightUnderflow)
);
}
}