#[derive(Debug, Clone, Copy)]
pub struct BufferWatermarks {
pub low: usize,
pub high: usize,
}
impl Default for BufferWatermarks {
fn default() -> Self {
Self {
low: 1024, high: 8 * 1024, }
}
}
impl BufferWatermarks {
pub fn new(low: usize, high: usize) -> Self {
assert!(low < high, "low watermark must be less than high");
Self { low, high }
}
#[must_use]
pub fn should_backpressure(&self, current: usize) -> bool {
current >= self.high
}
#[must_use]
pub fn can_write(&self, current: usize) -> bool {
current < self.low
}
#[must_use]
pub fn pressure_level(&self, current: usize) -> f64 {
(current as f64 / self.high as f64).min(1.0)
}
}
#[derive(Debug)]
pub struct WatermarkedBuffer {
data: Vec<u8>,
watermarks: BufferWatermarks,
}
impl WatermarkedBuffer {
pub fn new(watermarks: BufferWatermarks) -> Self {
Self { data: Vec::with_capacity(watermarks.high), watermarks }
}
#[must_use]
pub fn should_backpressure(&self) -> bool {
self.watermarks.should_backpressure(self.data.len())
}
#[must_use]
pub fn can_write(&self) -> bool {
self.watermarks.can_write(self.data.len())
}
#[must_use]
pub fn len(&self) -> usize {
self.data.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
pub fn write(&mut self, data: &[u8]) {
contract_pre_write!();
self.data.extend_from_slice(data);
}
pub fn drain(&mut self, amount: usize) -> Vec<u8> {
let amount = amount.min(self.data.len());
self.data.drain(..amount).collect()
}
pub fn clear(&mut self) {
self.data.clear();
}
#[must_use]
pub fn watermarks(&self) -> BufferWatermarks {
self.watermarks
}
#[must_use]
pub fn pressure_level(&self) -> f64 {
self.watermarks.pressure_level(self.data.len())
}
}
impl Default for WatermarkedBuffer {
fn default() -> Self {
Self::new(BufferWatermarks::default())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_buffer_watermarks_default() {
let wm = BufferWatermarks::default();
assert_eq!(wm.low, 1024);
assert_eq!(wm.high, 8 * 1024);
}
#[test]
fn test_buffer_watermarks_new() {
let wm = BufferWatermarks::new(100, 1000);
assert_eq!(wm.low, 100);
assert_eq!(wm.high, 1000);
}
#[test]
#[should_panic(expected = "low watermark must be less than high")]
fn test_buffer_watermarks_invalid() {
BufferWatermarks::new(1000, 100);
}
#[test]
fn test_buffer_watermarks_backpressure() {
let wm = BufferWatermarks::new(100, 1000);
assert!(!wm.should_backpressure(500));
assert!(!wm.should_backpressure(999));
assert!(wm.should_backpressure(1000));
assert!(wm.should_backpressure(1500));
}
#[test]
fn test_buffer_watermarks_can_write() {
let wm = BufferWatermarks::new(100, 1000);
assert!(wm.can_write(50));
assert!(wm.can_write(99));
assert!(!wm.can_write(100));
assert!(!wm.can_write(500));
}
#[test]
fn test_buffer_watermarks_pressure_level() {
let wm = BufferWatermarks::new(100, 1000);
assert!((wm.pressure_level(0) - 0.0).abs() < 0.001);
assert!((wm.pressure_level(500) - 0.5).abs() < 0.001);
assert!((wm.pressure_level(1000) - 1.0).abs() < 0.001);
assert!((wm.pressure_level(2000) - 1.0).abs() < 0.001); }
#[test]
fn test_watermarked_buffer_new() {
let wm = BufferWatermarks::new(100, 1000);
let buffer = WatermarkedBuffer::new(wm);
assert!(buffer.is_empty());
assert_eq!(buffer.len(), 0);
}
#[test]
fn test_watermarked_buffer_write() {
let mut buffer = WatermarkedBuffer::default();
buffer.write(&[1, 2, 3, 4, 5]);
assert_eq!(buffer.len(), 5);
assert!(!buffer.is_empty());
}
#[test]
fn test_watermarked_buffer_drain() {
let mut buffer = WatermarkedBuffer::default();
buffer.write(&[1, 2, 3, 4, 5]);
let drained = buffer.drain(3);
assert_eq!(drained, vec![1, 2, 3]);
assert_eq!(buffer.len(), 2);
}
#[test]
fn test_watermarked_buffer_drain_more_than_available() {
let mut buffer = WatermarkedBuffer::default();
buffer.write(&[1, 2, 3]);
let drained = buffer.drain(10);
assert_eq!(drained, vec![1, 2, 3]);
assert!(buffer.is_empty());
}
#[test]
fn test_watermarked_buffer_clear() {
let mut buffer = WatermarkedBuffer::default();
buffer.write(&[1, 2, 3, 4, 5]);
buffer.clear();
assert!(buffer.is_empty());
}
#[test]
fn test_watermarked_buffer_backpressure() {
let wm = BufferWatermarks::new(10, 100);
let mut buffer = WatermarkedBuffer::new(wm);
assert!(!buffer.should_backpressure());
assert!(buffer.can_write());
buffer.write(&[0u8; 50]);
assert!(!buffer.should_backpressure());
assert!(!buffer.can_write());
buffer.write(&[0u8; 50]);
assert!(buffer.should_backpressure());
}
#[test]
fn test_watermarked_buffer_pressure_level() {
let wm = BufferWatermarks::new(10, 100);
let mut buffer = WatermarkedBuffer::new(wm);
assert!((buffer.pressure_level() - 0.0).abs() < 0.001);
buffer.write(&[0u8; 50]);
assert!((buffer.pressure_level() - 0.5).abs() < 0.001);
buffer.write(&[0u8; 50]);
assert!((buffer.pressure_level() - 1.0).abs() < 0.001);
}
#[test]
fn test_watermarked_buffer_watermarks_getter() {
let wm = BufferWatermarks::new(100, 1000);
let buffer = WatermarkedBuffer::new(wm);
let retrieved = buffer.watermarks();
assert_eq!(retrieved.low, 100);
assert_eq!(retrieved.high, 1000);
}
#[test]
fn test_falsify_buffer_limit_signaling() {
let wm = BufferWatermarks::new(10, 100);
let mut buffer = WatermarkedBuffer::new(wm);
let mut backpressure_signaled_at = None;
for i in 0..200 {
let chunk = [i as u8; 10]; buffer.write(&chunk);
if buffer.should_backpressure() && backpressure_signaled_at.is_none() {
backpressure_signaled_at = Some(buffer.len());
}
}
assert!(
backpressure_signaled_at.is_some(),
"FALSIFICATION FAILED: Buffer accepted 2000 bytes without signaling backpressure"
);
let signal_point = backpressure_signaled_at.unwrap();
assert!(
signal_point >= 100,
"FALSIFICATION FAILED: Backpressure signaled too early at {} bytes (high=100)",
signal_point
);
assert_eq!(buffer.len(), 2000, "Buffer should accept all writes (signaling is advisory)");
assert!(
(buffer.pressure_level() - 1.0).abs() < 0.001,
"Pressure level should cap at 1.0 when over limit"
);
}
#[test]
fn test_falsify_drain_restores_writability() {
let wm = BufferWatermarks::new(10, 100);
let mut buffer = WatermarkedBuffer::new(wm);
buffer.write(&[0u8; 150]);
assert!(buffer.should_backpressure());
assert!(!buffer.can_write());
buffer.drain(145); assert_eq!(buffer.len(), 5);
assert!(
buffer.can_write(),
"FALSIFICATION FAILED: Draining below low watermark did not restore writability"
);
assert!(
!buffer.should_backpressure(),
"FALSIFICATION FAILED: Draining below low watermark did not clear backpressure"
);
}
}