1#[derive(Debug, Clone, Copy)]
20pub struct BufferWatermarks {
21 pub low: usize,
23 pub high: usize,
25}
26
27impl Default for BufferWatermarks {
28 fn default() -> Self {
29 Self {
30 low: 1024, high: 8 * 1024, }
33 }
34}
35
36impl BufferWatermarks {
37 pub fn new(low: usize, high: usize) -> Self {
42 assert!(low < high, "low watermark must be less than high");
43 Self { low, high }
44 }
45
46 #[must_use]
48 pub fn should_backpressure(&self, current: usize) -> bool {
49 current >= self.high
50 }
51
52 #[must_use]
54 pub fn can_write(&self, current: usize) -> bool {
55 current < self.low
56 }
57
58 #[must_use]
60 pub fn pressure_level(&self, current: usize) -> f64 {
61 (current as f64 / self.high as f64).min(1.0)
62 }
63}
64
65#[derive(Debug)]
67pub struct WatermarkedBuffer {
68 data: Vec<u8>,
69 watermarks: BufferWatermarks,
70}
71
72impl WatermarkedBuffer {
73 pub fn new(watermarks: BufferWatermarks) -> Self {
75 Self { data: Vec::with_capacity(watermarks.high), watermarks }
76 }
77
78 #[must_use]
80 pub fn should_backpressure(&self) -> bool {
81 self.watermarks.should_backpressure(self.data.len())
82 }
83
84 #[must_use]
86 pub fn can_write(&self) -> bool {
87 self.watermarks.can_write(self.data.len())
88 }
89
90 #[must_use]
92 pub fn len(&self) -> usize {
93 self.data.len()
94 }
95
96 #[must_use]
98 pub fn is_empty(&self) -> bool {
99 self.data.is_empty()
100 }
101
102 pub fn write(&mut self, data: &[u8]) {
104 contract_pre_write!();
105 self.data.extend_from_slice(data);
106 }
107
108 pub fn drain(&mut self, amount: usize) -> Vec<u8> {
110 let amount = amount.min(self.data.len());
111 self.data.drain(..amount).collect()
112 }
113
114 pub fn clear(&mut self) {
116 self.data.clear();
117 }
118
119 #[must_use]
121 pub fn watermarks(&self) -> BufferWatermarks {
122 self.watermarks
123 }
124
125 #[must_use]
127 pub fn pressure_level(&self) -> f64 {
128 self.watermarks.pressure_level(self.data.len())
129 }
130}
131
132impl Default for WatermarkedBuffer {
133 fn default() -> Self {
134 Self::new(BufferWatermarks::default())
135 }
136}
137
138#[cfg(test)]
139mod tests {
140 use super::*;
141
142 #[test]
143 fn test_buffer_watermarks_default() {
144 let wm = BufferWatermarks::default();
145 assert_eq!(wm.low, 1024);
146 assert_eq!(wm.high, 8 * 1024);
147 }
148
149 #[test]
150 fn test_buffer_watermarks_new() {
151 let wm = BufferWatermarks::new(100, 1000);
152 assert_eq!(wm.low, 100);
153 assert_eq!(wm.high, 1000);
154 }
155
156 #[test]
157 #[should_panic(expected = "low watermark must be less than high")]
158 fn test_buffer_watermarks_invalid() {
159 BufferWatermarks::new(1000, 100);
160 }
161
162 #[test]
163 fn test_buffer_watermarks_backpressure() {
164 let wm = BufferWatermarks::new(100, 1000);
165 assert!(!wm.should_backpressure(500));
166 assert!(!wm.should_backpressure(999));
167 assert!(wm.should_backpressure(1000));
168 assert!(wm.should_backpressure(1500));
169 }
170
171 #[test]
172 fn test_buffer_watermarks_can_write() {
173 let wm = BufferWatermarks::new(100, 1000);
174 assert!(wm.can_write(50));
175 assert!(wm.can_write(99));
176 assert!(!wm.can_write(100));
177 assert!(!wm.can_write(500));
178 }
179
180 #[test]
181 fn test_buffer_watermarks_pressure_level() {
182 let wm = BufferWatermarks::new(100, 1000);
183 assert!((wm.pressure_level(0) - 0.0).abs() < 0.001);
184 assert!((wm.pressure_level(500) - 0.5).abs() < 0.001);
185 assert!((wm.pressure_level(1000) - 1.0).abs() < 0.001);
186 assert!((wm.pressure_level(2000) - 1.0).abs() < 0.001); }
188
189 #[test]
190 fn test_watermarked_buffer_new() {
191 let wm = BufferWatermarks::new(100, 1000);
192 let buffer = WatermarkedBuffer::new(wm);
193 assert!(buffer.is_empty());
194 assert_eq!(buffer.len(), 0);
195 }
196
197 #[test]
198 fn test_watermarked_buffer_write() {
199 let mut buffer = WatermarkedBuffer::default();
200 buffer.write(&[1, 2, 3, 4, 5]);
201 assert_eq!(buffer.len(), 5);
202 assert!(!buffer.is_empty());
203 }
204
205 #[test]
206 fn test_watermarked_buffer_drain() {
207 let mut buffer = WatermarkedBuffer::default();
208 buffer.write(&[1, 2, 3, 4, 5]);
209 let drained = buffer.drain(3);
210 assert_eq!(drained, vec![1, 2, 3]);
211 assert_eq!(buffer.len(), 2);
212 }
213
214 #[test]
215 fn test_watermarked_buffer_drain_more_than_available() {
216 let mut buffer = WatermarkedBuffer::default();
217 buffer.write(&[1, 2, 3]);
218 let drained = buffer.drain(10);
219 assert_eq!(drained, vec![1, 2, 3]);
220 assert!(buffer.is_empty());
221 }
222
223 #[test]
224 fn test_watermarked_buffer_clear() {
225 let mut buffer = WatermarkedBuffer::default();
226 buffer.write(&[1, 2, 3, 4, 5]);
227 buffer.clear();
228 assert!(buffer.is_empty());
229 }
230
231 #[test]
232 fn test_watermarked_buffer_backpressure() {
233 let wm = BufferWatermarks::new(10, 100);
234 let mut buffer = WatermarkedBuffer::new(wm);
235
236 assert!(!buffer.should_backpressure());
237 assert!(buffer.can_write());
238
239 buffer.write(&[0u8; 50]);
240 assert!(!buffer.should_backpressure());
241 assert!(!buffer.can_write());
242
243 buffer.write(&[0u8; 50]);
244 assert!(buffer.should_backpressure());
245 }
246
247 #[test]
248 fn test_watermarked_buffer_pressure_level() {
249 let wm = BufferWatermarks::new(10, 100);
250 let mut buffer = WatermarkedBuffer::new(wm);
251
252 assert!((buffer.pressure_level() - 0.0).abs() < 0.001);
253
254 buffer.write(&[0u8; 50]);
255 assert!((buffer.pressure_level() - 0.5).abs() < 0.001);
256
257 buffer.write(&[0u8; 50]);
258 assert!((buffer.pressure_level() - 1.0).abs() < 0.001);
259 }
260
261 #[test]
262 fn test_watermarked_buffer_watermarks_getter() {
263 let wm = BufferWatermarks::new(100, 1000);
264 let buffer = WatermarkedBuffer::new(wm);
265 let retrieved = buffer.watermarks();
266 assert_eq!(retrieved.low, 100);
267 assert_eq!(retrieved.high, 1000);
268 }
269
270 #[test]
279 fn test_falsify_buffer_limit_signaling() {
280 let wm = BufferWatermarks::new(10, 100);
281 let mut buffer = WatermarkedBuffer::new(wm);
282
283 let mut backpressure_signaled_at = None;
285
286 for i in 0..200 {
287 let chunk = [i as u8; 10]; buffer.write(&chunk);
289
290 if buffer.should_backpressure() && backpressure_signaled_at.is_none() {
291 backpressure_signaled_at = Some(buffer.len());
292 }
293 }
294
295 assert!(
297 backpressure_signaled_at.is_some(),
298 "FALSIFICATION FAILED: Buffer accepted 2000 bytes without signaling backpressure"
299 );
300
301 let signal_point = backpressure_signaled_at.unwrap();
303 assert!(
304 signal_point >= 100,
305 "FALSIFICATION FAILED: Backpressure signaled too early at {} bytes (high=100)",
306 signal_point
307 );
308
309 assert_eq!(buffer.len(), 2000, "Buffer should accept all writes (signaling is advisory)");
311
312 assert!(
314 (buffer.pressure_level() - 1.0).abs() < 0.001,
315 "Pressure level should cap at 1.0 when over limit"
316 );
317 }
318
319 #[test]
321 fn test_falsify_drain_restores_writability() {
322 let wm = BufferWatermarks::new(10, 100);
323 let mut buffer = WatermarkedBuffer::new(wm);
324
325 buffer.write(&[0u8; 150]);
327 assert!(buffer.should_backpressure());
328 assert!(!buffer.can_write());
329
330 buffer.drain(145); assert_eq!(buffer.len(), 5);
333
334 assert!(
336 buffer.can_write(),
337 "FALSIFICATION FAILED: Draining below low watermark did not restore writability"
338 );
339 assert!(
340 !buffer.should_backpressure(),
341 "FALSIFICATION FAILED: Draining below low watermark did not clear backpressure"
342 );
343 }
344}