Skip to main content

trueno/brick/
buffer.rs

1//! Buffer Management with Flow Control
2//!
3//! Two-tier watermarks for back-pressure control.
4
5// ----------------------------------------------------------------------------
6// AWP-01: Two-Tier Buffer Watermarks
7// ----------------------------------------------------------------------------
8
9/// Two-tier buffer watermarks for back-pressure control.
10///
11/// # Example
12/// ```rust
13/// use trueno::brick::BufferWatermarks;
14///
15/// let wm = BufferWatermarks::default();
16/// assert!(!wm.should_backpressure(1000));  // Below high watermark
17/// assert!(wm.should_backpressure(10000));  // Above high watermark
18/// ```
19#[derive(Debug, Clone, Copy)]
20pub struct BufferWatermarks {
21    /// Low watermark: resume writing when buffer drops below this
22    pub low: usize,
23    /// High watermark: apply back-pressure when buffer exceeds this
24    pub high: usize,
25}
26
27impl Default for BufferWatermarks {
28    fn default() -> Self {
29        Self {
30            low: 1024,      // 1KB
31            high: 8 * 1024, // 8KB
32        }
33    }
34}
35
36impl BufferWatermarks {
37    /// Create new watermarks.
38    ///
39    /// # Panics
40    /// Panics if low >= high.
41    pub fn new(low: usize, high: usize) -> Self {
42        assert!(low < high, "low watermark must be less than high");
43        Self { low, high }
44    }
45
46    /// Check if back-pressure should be applied.
47    #[must_use]
48    pub fn should_backpressure(&self, current: usize) -> bool {
49        current >= self.high
50    }
51
52    /// Check if writing can resume.
53    #[must_use]
54    pub fn can_write(&self, current: usize) -> bool {
55        current < self.low
56    }
57
58    /// Get pressure level (0.0 = empty, 1.0 = at high watermark).
59    #[must_use]
60    pub fn pressure_level(&self, current: usize) -> f64 {
61        (current as f64 / self.high as f64).min(1.0)
62    }
63}
64
65/// Buffer with watermark-based flow control.
66#[derive(Debug)]
67pub struct WatermarkedBuffer {
68    data: Vec<u8>,
69    watermarks: BufferWatermarks,
70}
71
72impl WatermarkedBuffer {
73    /// Create a new watermarked buffer.
74    pub fn new(watermarks: BufferWatermarks) -> Self {
75        Self { data: Vec::with_capacity(watermarks.high), watermarks }
76    }
77
78    /// Check if back-pressure should be applied.
79    #[must_use]
80    pub fn should_backpressure(&self) -> bool {
81        self.watermarks.should_backpressure(self.data.len())
82    }
83
84    /// Check if writing can resume.
85    #[must_use]
86    pub fn can_write(&self) -> bool {
87        self.watermarks.can_write(self.data.len())
88    }
89
90    /// Get current buffer length.
91    #[must_use]
92    pub fn len(&self) -> usize {
93        self.data.len()
94    }
95
96    /// Check if buffer is empty.
97    #[must_use]
98    pub fn is_empty(&self) -> bool {
99        self.data.is_empty()
100    }
101
102    /// Write data to the buffer.
103    pub fn write(&mut self, data: &[u8]) {
104        contract_pre_write!();
105        self.data.extend_from_slice(data);
106    }
107
108    /// Drain data from the buffer.
109    pub fn drain(&mut self, amount: usize) -> Vec<u8> {
110        let amount = amount.min(self.data.len());
111        self.data.drain(..amount).collect()
112    }
113
114    /// Clear the buffer.
115    pub fn clear(&mut self) {
116        self.data.clear();
117    }
118
119    /// Get the watermarks configuration.
120    #[must_use]
121    pub fn watermarks(&self) -> BufferWatermarks {
122        self.watermarks
123    }
124
125    /// Get current pressure level.
126    #[must_use]
127    pub fn pressure_level(&self) -> f64 {
128        self.watermarks.pressure_level(self.data.len())
129    }
130}
131
132impl Default for WatermarkedBuffer {
133    fn default() -> Self {
134        Self::new(BufferWatermarks::default())
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use super::*;
141
142    #[test]
143    fn test_buffer_watermarks_default() {
144        let wm = BufferWatermarks::default();
145        assert_eq!(wm.low, 1024);
146        assert_eq!(wm.high, 8 * 1024);
147    }
148
149    #[test]
150    fn test_buffer_watermarks_new() {
151        let wm = BufferWatermarks::new(100, 1000);
152        assert_eq!(wm.low, 100);
153        assert_eq!(wm.high, 1000);
154    }
155
156    #[test]
157    #[should_panic(expected = "low watermark must be less than high")]
158    fn test_buffer_watermarks_invalid() {
159        BufferWatermarks::new(1000, 100);
160    }
161
162    #[test]
163    fn test_buffer_watermarks_backpressure() {
164        let wm = BufferWatermarks::new(100, 1000);
165        assert!(!wm.should_backpressure(500));
166        assert!(!wm.should_backpressure(999));
167        assert!(wm.should_backpressure(1000));
168        assert!(wm.should_backpressure(1500));
169    }
170
171    #[test]
172    fn test_buffer_watermarks_can_write() {
173        let wm = BufferWatermarks::new(100, 1000);
174        assert!(wm.can_write(50));
175        assert!(wm.can_write(99));
176        assert!(!wm.can_write(100));
177        assert!(!wm.can_write(500));
178    }
179
180    #[test]
181    fn test_buffer_watermarks_pressure_level() {
182        let wm = BufferWatermarks::new(100, 1000);
183        assert!((wm.pressure_level(0) - 0.0).abs() < 0.001);
184        assert!((wm.pressure_level(500) - 0.5).abs() < 0.001);
185        assert!((wm.pressure_level(1000) - 1.0).abs() < 0.001);
186        assert!((wm.pressure_level(2000) - 1.0).abs() < 0.001); // Capped at 1.0
187    }
188
189    #[test]
190    fn test_watermarked_buffer_new() {
191        let wm = BufferWatermarks::new(100, 1000);
192        let buffer = WatermarkedBuffer::new(wm);
193        assert!(buffer.is_empty());
194        assert_eq!(buffer.len(), 0);
195    }
196
197    #[test]
198    fn test_watermarked_buffer_write() {
199        let mut buffer = WatermarkedBuffer::default();
200        buffer.write(&[1, 2, 3, 4, 5]);
201        assert_eq!(buffer.len(), 5);
202        assert!(!buffer.is_empty());
203    }
204
205    #[test]
206    fn test_watermarked_buffer_drain() {
207        let mut buffer = WatermarkedBuffer::default();
208        buffer.write(&[1, 2, 3, 4, 5]);
209        let drained = buffer.drain(3);
210        assert_eq!(drained, vec![1, 2, 3]);
211        assert_eq!(buffer.len(), 2);
212    }
213
214    #[test]
215    fn test_watermarked_buffer_drain_more_than_available() {
216        let mut buffer = WatermarkedBuffer::default();
217        buffer.write(&[1, 2, 3]);
218        let drained = buffer.drain(10);
219        assert_eq!(drained, vec![1, 2, 3]);
220        assert!(buffer.is_empty());
221    }
222
223    #[test]
224    fn test_watermarked_buffer_clear() {
225        let mut buffer = WatermarkedBuffer::default();
226        buffer.write(&[1, 2, 3, 4, 5]);
227        buffer.clear();
228        assert!(buffer.is_empty());
229    }
230
231    #[test]
232    fn test_watermarked_buffer_backpressure() {
233        let wm = BufferWatermarks::new(10, 100);
234        let mut buffer = WatermarkedBuffer::new(wm);
235
236        assert!(!buffer.should_backpressure());
237        assert!(buffer.can_write());
238
239        buffer.write(&[0u8; 50]);
240        assert!(!buffer.should_backpressure());
241        assert!(!buffer.can_write());
242
243        buffer.write(&[0u8; 50]);
244        assert!(buffer.should_backpressure());
245    }
246
247    #[test]
248    fn test_watermarked_buffer_pressure_level() {
249        let wm = BufferWatermarks::new(10, 100);
250        let mut buffer = WatermarkedBuffer::new(wm);
251
252        assert!((buffer.pressure_level() - 0.0).abs() < 0.001);
253
254        buffer.write(&[0u8; 50]);
255        assert!((buffer.pressure_level() - 0.5).abs() < 0.001);
256
257        buffer.write(&[0u8; 50]);
258        assert!((buffer.pressure_level() - 1.0).abs() < 0.001);
259    }
260
261    #[test]
262    fn test_watermarked_buffer_watermarks_getter() {
263        let wm = BufferWatermarks::new(100, 1000);
264        let buffer = WatermarkedBuffer::new(wm);
265        let retrieved = buffer.watermarks();
266        assert_eq!(retrieved.low, 100);
267        assert_eq!(retrieved.high, 1000);
268    }
269
270    /// FALSIFICATION TEST (Section 4.3)
271    ///
272    /// Verifies the buffer's behavior when pushed past its high watermark.
273    /// The WatermarkedBuffer is a SIGNALING mechanism, not a blocking mechanism.
274    /// It will accept writes beyond the limit but MUST correctly signal backpressure.
275    ///
276    /// Failure mode: If this test passes but the buffer doesn't signal,
277    /// the caller would never know to stop writing, leading to OOM.
278    #[test]
279    fn test_falsify_buffer_limit_signaling() {
280        let wm = BufferWatermarks::new(10, 100);
281        let mut buffer = WatermarkedBuffer::new(wm);
282
283        // Write incrementally and verify signaling at each step
284        let mut backpressure_signaled_at = None;
285
286        for i in 0..200 {
287            let chunk = [i as u8; 10]; // 10 bytes per write
288            buffer.write(&chunk);
289
290            if buffer.should_backpressure() && backpressure_signaled_at.is_none() {
291                backpressure_signaled_at = Some(buffer.len());
292            }
293        }
294
295        // CRITICAL ASSERTION: Backpressure MUST have been signaled
296        assert!(
297            backpressure_signaled_at.is_some(),
298            "FALSIFICATION FAILED: Buffer accepted 2000 bytes without signaling backpressure"
299        );
300
301        // Verify it was signaled at the right threshold (at or above high watermark)
302        let signal_point = backpressure_signaled_at.unwrap();
303        assert!(
304            signal_point >= 100,
305            "FALSIFICATION FAILED: Backpressure signaled too early at {} bytes (high=100)",
306            signal_point
307        );
308
309        // Verify buffer actually accepted all the data (it's non-blocking)
310        assert_eq!(buffer.len(), 2000, "Buffer should accept all writes (signaling is advisory)");
311
312        // Verify pressure level is capped at 1.0 even when way over
313        assert!(
314            (buffer.pressure_level() - 1.0).abs() < 0.001,
315            "Pressure level should cap at 1.0 when over limit"
316        );
317    }
318
319    /// FALSIFICATION TEST: Verify drain restores write capability
320    #[test]
321    fn test_falsify_drain_restores_writability() {
322        let wm = BufferWatermarks::new(10, 100);
323        let mut buffer = WatermarkedBuffer::new(wm);
324
325        // Fill past high watermark
326        buffer.write(&[0u8; 150]);
327        assert!(buffer.should_backpressure());
328        assert!(!buffer.can_write());
329
330        // Drain to below low watermark
331        buffer.drain(145); // Should leave 5 bytes
332        assert_eq!(buffer.len(), 5);
333
334        // CRITICAL: Must restore write capability
335        assert!(
336            buffer.can_write(),
337            "FALSIFICATION FAILED: Draining below low watermark did not restore writability"
338        );
339        assert!(
340            !buffer.should_backpressure(),
341            "FALSIFICATION FAILED: Draining below low watermark did not clear backpressure"
342        );
343    }
344}