1use std::collections::VecDeque;
9use std::fmt;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum BufferStrategy {
14 Direct,
16 Ring {
18 capacity: usize,
20 },
21 Unbounded,
23 DoubleBuffer,
25}
26
27impl BufferStrategy {
28 pub fn capacity_hint(&self) -> usize {
30 match self {
31 Self::Direct => 0,
32 Self::Ring { capacity } => *capacity,
33 Self::Unbounded => 0,
34 Self::DoubleBuffer => 2,
35 }
36 }
37}
38
39impl Default for BufferStrategy {
40 fn default() -> Self {
41 Self::Ring { capacity: 4 }
42 }
43}
44
45impl fmt::Display for BufferStrategy {
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 match self {
48 Self::Direct => write!(f, "Direct"),
49 Self::Ring { capacity } => write!(f, "Ring({capacity})"),
50 Self::Unbounded => write!(f, "Unbounded"),
51 Self::DoubleBuffer => write!(f, "DoubleBuffer"),
52 }
53 }
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum BufferStatus {
59 Empty,
61 Partial,
63 Full,
65}
66
67impl fmt::Display for BufferStatus {
68 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69 match self {
70 Self::Empty => write!(f, "Empty"),
71 Self::Partial => write!(f, "Partial"),
72 Self::Full => write!(f, "Full"),
73 }
74 }
75}
76
77#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct FrameToken {
80 pub sequence: u64,
82 pub pts_us: i64,
84 pub size: usize,
86}
87
88impl FrameToken {
89 pub fn new(sequence: u64, pts_us: i64, size: usize) -> Self {
91 Self {
92 sequence,
93 pts_us,
94 size,
95 }
96 }
97}
98
99impl fmt::Display for FrameToken {
100 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101 write!(
102 f,
103 "Frame(seq={}, pts={}us, {}B)",
104 self.sequence, self.pts_us, self.size
105 )
106 }
107}
108
109pub struct PortBuffer {
111 strategy: BufferStrategy,
113 queue: VecDeque<FrameToken>,
115 max_capacity: usize,
117 total_pushed: u64,
119 total_popped: u64,
121 dropped: u64,
123 label: String,
125}
126
127impl PortBuffer {
128 pub fn new(strategy: BufferStrategy, label: &str) -> Self {
130 let max_capacity = match strategy {
131 BufferStrategy::Direct => 1,
132 BufferStrategy::Ring { capacity } => capacity,
133 BufferStrategy::Unbounded => 0,
134 BufferStrategy::DoubleBuffer => 2,
135 };
136 Self {
137 strategy,
138 queue: VecDeque::with_capacity(max_capacity.min(1024)),
139 max_capacity,
140 total_pushed: 0,
141 total_popped: 0,
142 dropped: 0,
143 label: label.to_string(),
144 }
145 }
146
147 pub fn push(&mut self, token: FrameToken) -> bool {
152 if self.max_capacity > 0 && self.queue.len() >= self.max_capacity {
153 if matches!(self.strategy, BufferStrategy::Ring { .. }) {
155 self.queue.pop_front();
156 self.dropped += 1;
157 } else {
158 self.dropped += 1;
159 return false;
160 }
161 }
162 self.queue.push_back(token);
163 self.total_pushed += 1;
164 true
165 }
166
167 pub fn pop(&mut self) -> Option<FrameToken> {
169 let token = self.queue.pop_front();
170 if token.is_some() {
171 self.total_popped += 1;
172 }
173 token
174 }
175
176 pub fn peek(&self) -> Option<&FrameToken> {
178 self.queue.front()
179 }
180
181 pub fn len(&self) -> usize {
183 self.queue.len()
184 }
185
186 pub fn is_empty(&self) -> bool {
188 self.queue.is_empty()
189 }
190
191 pub fn status(&self) -> BufferStatus {
193 if self.queue.is_empty() {
194 BufferStatus::Empty
195 } else if self.max_capacity > 0 && self.queue.len() >= self.max_capacity {
196 BufferStatus::Full
197 } else {
198 BufferStatus::Partial
199 }
200 }
201
202 pub fn strategy(&self) -> BufferStrategy {
204 self.strategy
205 }
206
207 pub fn max_capacity(&self) -> usize {
209 self.max_capacity
210 }
211
212 pub fn total_pushed(&self) -> u64 {
214 self.total_pushed
215 }
216
217 pub fn total_popped(&self) -> u64 {
219 self.total_popped
220 }
221
222 pub fn dropped(&self) -> u64 {
224 self.dropped
225 }
226
227 pub fn label(&self) -> &str {
229 &self.label
230 }
231
232 pub fn clear(&mut self) {
234 self.queue.clear();
235 }
236
237 pub fn drain_all(&mut self) -> Vec<FrameToken> {
239 let items: Vec<_> = self.queue.drain(..).collect();
240 self.total_popped += items.len() as u64;
241 items
242 }
243
244 #[allow(clippy::cast_precision_loss)]
246 pub fn fill_ratio(&self) -> f64 {
247 if self.max_capacity == 0 {
248 return 0.0;
249 }
250 self.queue.len() as f64 / self.max_capacity as f64
251 }
252}
253
254impl fmt::Display for PortBuffer {
255 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256 write!(
257 f,
258 "PortBuffer[{}]: {} ({}/{} frames, {} dropped)",
259 self.label,
260 self.strategy,
261 self.queue.len(),
262 if self.max_capacity > 0 {
263 self.max_capacity.to_string()
264 } else {
265 "inf".to_string()
266 },
267 self.dropped
268 )
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275
276 #[test]
277 fn test_buffer_strategy_default() {
278 let s = BufferStrategy::default();
279 assert_eq!(s, BufferStrategy::Ring { capacity: 4 });
280 }
281
282 #[test]
283 fn test_buffer_strategy_display() {
284 assert_eq!(format!("{}", BufferStrategy::Direct), "Direct");
285 assert_eq!(
286 format!("{}", BufferStrategy::Ring { capacity: 8 }),
287 "Ring(8)"
288 );
289 assert_eq!(format!("{}", BufferStrategy::Unbounded), "Unbounded");
290 assert_eq!(format!("{}", BufferStrategy::DoubleBuffer), "DoubleBuffer");
291 }
292
293 #[test]
294 fn test_capacity_hint() {
295 assert_eq!(BufferStrategy::Direct.capacity_hint(), 0);
296 assert_eq!(BufferStrategy::Ring { capacity: 16 }.capacity_hint(), 16);
297 assert_eq!(BufferStrategy::Unbounded.capacity_hint(), 0);
298 assert_eq!(BufferStrategy::DoubleBuffer.capacity_hint(), 2);
299 }
300
301 #[test]
302 fn test_frame_token_new() {
303 let tok = FrameToken::new(42, 1_000_000, 4096);
304 assert_eq!(tok.sequence, 42);
305 assert_eq!(tok.pts_us, 1_000_000);
306 assert_eq!(tok.size, 4096);
307 }
308
309 #[test]
310 fn test_frame_token_display() {
311 let tok = FrameToken::new(1, 500, 256);
312 assert_eq!(format!("{tok}"), "Frame(seq=1, pts=500us, 256B)");
313 }
314
315 #[test]
316 fn test_port_buffer_push_pop() {
317 let mut buf = PortBuffer::new(BufferStrategy::Ring { capacity: 4 }, "test");
318 assert!(buf.push(FrameToken::new(0, 0, 100)));
319 assert!(buf.push(FrameToken::new(1, 1000, 100)));
320 assert_eq!(buf.len(), 2);
321 let tok = buf.pop().expect("pop should succeed");
322 assert_eq!(tok.sequence, 0);
323 assert_eq!(buf.len(), 1);
324 }
325
326 #[test]
327 fn test_port_buffer_peek() {
328 let mut buf = PortBuffer::new(BufferStrategy::Ring { capacity: 4 }, "test");
329 assert!(buf.peek().is_none());
330 buf.push(FrameToken::new(5, 0, 10));
331 assert_eq!(buf.peek().expect("peek should succeed").sequence, 5);
332 assert_eq!(buf.len(), 1); }
334
335 #[test]
336 fn test_port_buffer_ring_overflow_drops_oldest() {
337 let mut buf = PortBuffer::new(BufferStrategy::Ring { capacity: 2 }, "ring");
338 buf.push(FrameToken::new(0, 0, 10));
339 buf.push(FrameToken::new(1, 0, 10));
340 buf.push(FrameToken::new(2, 0, 10)); assert_eq!(buf.len(), 2);
342 assert_eq!(buf.dropped(), 1);
343 assert_eq!(buf.pop().expect("pop should succeed").sequence, 1);
344 }
345
346 #[test]
347 fn test_port_buffer_unbounded() {
348 let mut buf = PortBuffer::new(BufferStrategy::Unbounded, "unb");
349 for i in 0..100 {
350 assert!(buf.push(FrameToken::new(i, 0, 10)));
351 }
352 assert_eq!(buf.len(), 100);
353 assert_eq!(buf.dropped(), 0);
354 }
355
356 #[test]
357 fn test_port_buffer_status() {
358 let mut buf = PortBuffer::new(BufferStrategy::Ring { capacity: 2 }, "s");
359 assert_eq!(buf.status(), BufferStatus::Empty);
360 buf.push(FrameToken::new(0, 0, 1));
361 assert_eq!(buf.status(), BufferStatus::Partial);
362 buf.push(FrameToken::new(1, 0, 1));
363 assert_eq!(buf.status(), BufferStatus::Full);
364 }
365
366 #[test]
367 fn test_port_buffer_clear() {
368 let mut buf = PortBuffer::new(BufferStrategy::Ring { capacity: 8 }, "c");
369 buf.push(FrameToken::new(0, 0, 1));
370 buf.push(FrameToken::new(1, 0, 1));
371 buf.clear();
372 assert!(buf.is_empty());
373 }
374
375 #[test]
376 fn test_port_buffer_drain_all() {
377 let mut buf = PortBuffer::new(BufferStrategy::Ring { capacity: 8 }, "d");
378 buf.push(FrameToken::new(0, 0, 1));
379 buf.push(FrameToken::new(1, 0, 1));
380 buf.push(FrameToken::new(2, 0, 1));
381 let drained = buf.drain_all();
382 assert_eq!(drained.len(), 3);
383 assert!(buf.is_empty());
384 assert_eq!(buf.total_popped(), 3);
385 }
386
387 #[test]
388 fn test_port_buffer_fill_ratio() {
389 let mut buf = PortBuffer::new(BufferStrategy::Ring { capacity: 4 }, "f");
390 assert!((buf.fill_ratio() - 0.0).abs() < f64::EPSILON);
391 buf.push(FrameToken::new(0, 0, 1));
392 buf.push(FrameToken::new(1, 0, 1));
393 assert!((buf.fill_ratio() - 0.5).abs() < f64::EPSILON);
394 }
395
396 #[test]
397 fn test_port_buffer_fill_ratio_unbounded() {
398 let buf = PortBuffer::new(BufferStrategy::Unbounded, "u");
399 assert!((buf.fill_ratio() - 0.0).abs() < f64::EPSILON);
400 }
401
402 #[test]
403 fn test_port_buffer_total_counters() {
404 let mut buf = PortBuffer::new(BufferStrategy::Ring { capacity: 8 }, "tc");
405 buf.push(FrameToken::new(0, 0, 1));
406 buf.push(FrameToken::new(1, 0, 1));
407 buf.pop();
408 assert_eq!(buf.total_pushed(), 2);
409 assert_eq!(buf.total_popped(), 1);
410 }
411
412 #[test]
413 fn test_buffer_status_display() {
414 assert_eq!(format!("{}", BufferStatus::Empty), "Empty");
415 assert_eq!(format!("{}", BufferStatus::Partial), "Partial");
416 assert_eq!(format!("{}", BufferStatus::Full), "Full");
417 }
418}