Skip to main content

oximedia_graph/
port_buffer.rs

1//! Port buffering strategies for graph connections.
2//!
3//! This module provides configurable buffering between graph ports,
4//! controlling how many frames can be queued between an output port
5//! and its connected input port. Different strategies allow trade-offs
6//! between latency and throughput.
7
8use std::collections::VecDeque;
9use std::fmt;
10
11/// Buffering strategy for a port connection.
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum BufferStrategy {
14    /// No buffering; frames are passed directly (synchronous).
15    Direct,
16    /// Fixed-size ring buffer with the given capacity.
17    Ring {
18        /// Maximum number of frames in the buffer.
19        capacity: usize,
20    },
21    /// Unbounded buffer (grows as needed).
22    Unbounded,
23    /// Double-buffer: producer writes to back, consumer reads from front.
24    DoubleBuffer,
25}
26
27impl BufferStrategy {
28    /// Get the capacity hint for this strategy (0 for unbounded/direct).
29    pub fn capacity_hint(&self) -> usize {
30        match self {
31            Self::Direct => 0,
32            Self::Ring { capacity } => *capacity,
33            Self::Unbounded => 0,
34            Self::DoubleBuffer => 2,
35        }
36    }
37}
38
39impl Default for BufferStrategy {
40    fn default() -> Self {
41        Self::Ring { capacity: 4 }
42    }
43}
44
45impl fmt::Display for BufferStrategy {
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        match self {
48            Self::Direct => write!(f, "Direct"),
49            Self::Ring { capacity } => write!(f, "Ring({capacity})"),
50            Self::Unbounded => write!(f, "Unbounded"),
51            Self::DoubleBuffer => write!(f, "DoubleBuffer"),
52        }
53    }
54}
55
56/// Status of a port buffer.
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum BufferStatus {
59    /// Buffer is empty; consumer will block or return None.
60    Empty,
61    /// Buffer has items but is not full.
62    Partial,
63    /// Buffer is at capacity.
64    Full,
65}
66
67impl fmt::Display for BufferStatus {
68    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69        match self {
70            Self::Empty => write!(f, "Empty"),
71            Self::Partial => write!(f, "Partial"),
72            Self::Full => write!(f, "Full"),
73        }
74    }
75}
76
77/// A frame token used within the port buffer (lightweight handle).
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct FrameToken {
80    /// Sequence number of the frame.
81    pub sequence: u64,
82    /// Presentation timestamp in microseconds.
83    pub pts_us: i64,
84    /// Size of the frame data in bytes.
85    pub size: usize,
86}
87
88impl FrameToken {
89    /// Create a new frame token.
90    pub fn new(sequence: u64, pts_us: i64, size: usize) -> Self {
91        Self {
92            sequence,
93            pts_us,
94            size,
95        }
96    }
97}
98
99impl fmt::Display for FrameToken {
100    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101        write!(
102            f,
103            "Frame(seq={}, pts={}us, {}B)",
104            self.sequence, self.pts_us, self.size
105        )
106    }
107}
108
109/// A buffer that sits between an output port and an input port.
110pub struct PortBuffer {
111    /// The buffering strategy.
112    strategy: BufferStrategy,
113    /// The queue of frame tokens.
114    queue: VecDeque<FrameToken>,
115    /// Maximum capacity (0 = unbounded).
116    max_capacity: usize,
117    /// Total frames pushed since creation.
118    total_pushed: u64,
119    /// Total frames popped since creation.
120    total_popped: u64,
121    /// Number of frames dropped due to overflow.
122    dropped: u64,
123    /// Label for debugging.
124    label: String,
125}
126
127impl PortBuffer {
128    /// Create a new port buffer with the given strategy.
129    pub fn new(strategy: BufferStrategy, label: &str) -> Self {
130        let max_capacity = match strategy {
131            BufferStrategy::Direct => 1,
132            BufferStrategy::Ring { capacity } => capacity,
133            BufferStrategy::Unbounded => 0,
134            BufferStrategy::DoubleBuffer => 2,
135        };
136        Self {
137            strategy,
138            queue: VecDeque::with_capacity(max_capacity.min(1024)),
139            max_capacity,
140            total_pushed: 0,
141            total_popped: 0,
142            dropped: 0,
143            label: label.to_string(),
144        }
145    }
146
147    /// Push a frame token into the buffer.
148    ///
149    /// Returns `true` if the frame was accepted, `false` if it was dropped
150    /// due to overflow (for bounded strategies).
151    pub fn push(&mut self, token: FrameToken) -> bool {
152        if self.max_capacity > 0 && self.queue.len() >= self.max_capacity {
153            // For ring buffers, drop the oldest frame
154            if matches!(self.strategy, BufferStrategy::Ring { .. }) {
155                self.queue.pop_front();
156                self.dropped += 1;
157            } else {
158                self.dropped += 1;
159                return false;
160            }
161        }
162        self.queue.push_back(token);
163        self.total_pushed += 1;
164        true
165    }
166
167    /// Pop the next frame token from the buffer.
168    pub fn pop(&mut self) -> Option<FrameToken> {
169        let token = self.queue.pop_front();
170        if token.is_some() {
171            self.total_popped += 1;
172        }
173        token
174    }
175
176    /// Peek at the next frame token without removing it.
177    pub fn peek(&self) -> Option<&FrameToken> {
178        self.queue.front()
179    }
180
181    /// Get the current number of frames in the buffer.
182    pub fn len(&self) -> usize {
183        self.queue.len()
184    }
185
186    /// Check whether the buffer is empty.
187    pub fn is_empty(&self) -> bool {
188        self.queue.is_empty()
189    }
190
191    /// Get the current buffer status.
192    pub fn status(&self) -> BufferStatus {
193        if self.queue.is_empty() {
194            BufferStatus::Empty
195        } else if self.max_capacity > 0 && self.queue.len() >= self.max_capacity {
196            BufferStatus::Full
197        } else {
198            BufferStatus::Partial
199        }
200    }
201
202    /// Get the buffering strategy.
203    pub fn strategy(&self) -> BufferStrategy {
204        self.strategy
205    }
206
207    /// Get the maximum capacity (0 for unbounded).
208    pub fn max_capacity(&self) -> usize {
209        self.max_capacity
210    }
211
212    /// Get the total number of frames pushed.
213    pub fn total_pushed(&self) -> u64 {
214        self.total_pushed
215    }
216
217    /// Get the total number of frames popped.
218    pub fn total_popped(&self) -> u64 {
219        self.total_popped
220    }
221
222    /// Get the number of dropped frames.
223    pub fn dropped(&self) -> u64 {
224        self.dropped
225    }
226
227    /// Get the buffer label.
228    pub fn label(&self) -> &str {
229        &self.label
230    }
231
232    /// Clear all buffered frames.
233    pub fn clear(&mut self) {
234        self.queue.clear();
235    }
236
237    /// Drain all frames from the buffer as a vector.
238    pub fn drain_all(&mut self) -> Vec<FrameToken> {
239        let items: Vec<_> = self.queue.drain(..).collect();
240        self.total_popped += items.len() as u64;
241        items
242    }
243
244    /// Get fill ratio (0.0 to 1.0). Returns 0.0 for unbounded buffers.
245    #[allow(clippy::cast_precision_loss)]
246    pub fn fill_ratio(&self) -> f64 {
247        if self.max_capacity == 0 {
248            return 0.0;
249        }
250        self.queue.len() as f64 / self.max_capacity as f64
251    }
252}
253
254impl fmt::Display for PortBuffer {
255    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256        write!(
257            f,
258            "PortBuffer[{}]: {} ({}/{} frames, {} dropped)",
259            self.label,
260            self.strategy,
261            self.queue.len(),
262            if self.max_capacity > 0 {
263                self.max_capacity.to_string()
264            } else {
265                "inf".to_string()
266            },
267            self.dropped
268        )
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275
276    #[test]
277    fn test_buffer_strategy_default() {
278        let s = BufferStrategy::default();
279        assert_eq!(s, BufferStrategy::Ring { capacity: 4 });
280    }
281
282    #[test]
283    fn test_buffer_strategy_display() {
284        assert_eq!(format!("{}", BufferStrategy::Direct), "Direct");
285        assert_eq!(
286            format!("{}", BufferStrategy::Ring { capacity: 8 }),
287            "Ring(8)"
288        );
289        assert_eq!(format!("{}", BufferStrategy::Unbounded), "Unbounded");
290        assert_eq!(format!("{}", BufferStrategy::DoubleBuffer), "DoubleBuffer");
291    }
292
293    #[test]
294    fn test_capacity_hint() {
295        assert_eq!(BufferStrategy::Direct.capacity_hint(), 0);
296        assert_eq!(BufferStrategy::Ring { capacity: 16 }.capacity_hint(), 16);
297        assert_eq!(BufferStrategy::Unbounded.capacity_hint(), 0);
298        assert_eq!(BufferStrategy::DoubleBuffer.capacity_hint(), 2);
299    }
300
301    #[test]
302    fn test_frame_token_new() {
303        let tok = FrameToken::new(42, 1_000_000, 4096);
304        assert_eq!(tok.sequence, 42);
305        assert_eq!(tok.pts_us, 1_000_000);
306        assert_eq!(tok.size, 4096);
307    }
308
309    #[test]
310    fn test_frame_token_display() {
311        let tok = FrameToken::new(1, 500, 256);
312        assert_eq!(format!("{tok}"), "Frame(seq=1, pts=500us, 256B)");
313    }
314
315    #[test]
316    fn test_port_buffer_push_pop() {
317        let mut buf = PortBuffer::new(BufferStrategy::Ring { capacity: 4 }, "test");
318        assert!(buf.push(FrameToken::new(0, 0, 100)));
319        assert!(buf.push(FrameToken::new(1, 1000, 100)));
320        assert_eq!(buf.len(), 2);
321        let tok = buf.pop().expect("pop should succeed");
322        assert_eq!(tok.sequence, 0);
323        assert_eq!(buf.len(), 1);
324    }
325
326    #[test]
327    fn test_port_buffer_peek() {
328        let mut buf = PortBuffer::new(BufferStrategy::Ring { capacity: 4 }, "test");
329        assert!(buf.peek().is_none());
330        buf.push(FrameToken::new(5, 0, 10));
331        assert_eq!(buf.peek().expect("peek should succeed").sequence, 5);
332        assert_eq!(buf.len(), 1); // Peek doesn't consume
333    }
334
335    #[test]
336    fn test_port_buffer_ring_overflow_drops_oldest() {
337        let mut buf = PortBuffer::new(BufferStrategy::Ring { capacity: 2 }, "ring");
338        buf.push(FrameToken::new(0, 0, 10));
339        buf.push(FrameToken::new(1, 0, 10));
340        buf.push(FrameToken::new(2, 0, 10)); // Should drop seq=0
341        assert_eq!(buf.len(), 2);
342        assert_eq!(buf.dropped(), 1);
343        assert_eq!(buf.pop().expect("pop should succeed").sequence, 1);
344    }
345
346    #[test]
347    fn test_port_buffer_unbounded() {
348        let mut buf = PortBuffer::new(BufferStrategy::Unbounded, "unb");
349        for i in 0..100 {
350            assert!(buf.push(FrameToken::new(i, 0, 10)));
351        }
352        assert_eq!(buf.len(), 100);
353        assert_eq!(buf.dropped(), 0);
354    }
355
356    #[test]
357    fn test_port_buffer_status() {
358        let mut buf = PortBuffer::new(BufferStrategy::Ring { capacity: 2 }, "s");
359        assert_eq!(buf.status(), BufferStatus::Empty);
360        buf.push(FrameToken::new(0, 0, 1));
361        assert_eq!(buf.status(), BufferStatus::Partial);
362        buf.push(FrameToken::new(1, 0, 1));
363        assert_eq!(buf.status(), BufferStatus::Full);
364    }
365
366    #[test]
367    fn test_port_buffer_clear() {
368        let mut buf = PortBuffer::new(BufferStrategy::Ring { capacity: 8 }, "c");
369        buf.push(FrameToken::new(0, 0, 1));
370        buf.push(FrameToken::new(1, 0, 1));
371        buf.clear();
372        assert!(buf.is_empty());
373    }
374
375    #[test]
376    fn test_port_buffer_drain_all() {
377        let mut buf = PortBuffer::new(BufferStrategy::Ring { capacity: 8 }, "d");
378        buf.push(FrameToken::new(0, 0, 1));
379        buf.push(FrameToken::new(1, 0, 1));
380        buf.push(FrameToken::new(2, 0, 1));
381        let drained = buf.drain_all();
382        assert_eq!(drained.len(), 3);
383        assert!(buf.is_empty());
384        assert_eq!(buf.total_popped(), 3);
385    }
386
387    #[test]
388    fn test_port_buffer_fill_ratio() {
389        let mut buf = PortBuffer::new(BufferStrategy::Ring { capacity: 4 }, "f");
390        assert!((buf.fill_ratio() - 0.0).abs() < f64::EPSILON);
391        buf.push(FrameToken::new(0, 0, 1));
392        buf.push(FrameToken::new(1, 0, 1));
393        assert!((buf.fill_ratio() - 0.5).abs() < f64::EPSILON);
394    }
395
396    #[test]
397    fn test_port_buffer_fill_ratio_unbounded() {
398        let buf = PortBuffer::new(BufferStrategy::Unbounded, "u");
399        assert!((buf.fill_ratio() - 0.0).abs() < f64::EPSILON);
400    }
401
402    #[test]
403    fn test_port_buffer_total_counters() {
404        let mut buf = PortBuffer::new(BufferStrategy::Ring { capacity: 8 }, "tc");
405        buf.push(FrameToken::new(0, 0, 1));
406        buf.push(FrameToken::new(1, 0, 1));
407        buf.pop();
408        assert_eq!(buf.total_pushed(), 2);
409        assert_eq!(buf.total_popped(), 1);
410    }
411
412    #[test]
413    fn test_buffer_status_display() {
414        assert_eq!(format!("{}", BufferStatus::Empty), "Empty");
415        assert_eq!(format!("{}", BufferStatus::Partial), "Partial");
416        assert_eq!(format!("{}", BufferStatus::Full), "Full");
417    }
418}