1use std::collections::VecDeque;
21use std::sync::Mutex;
22
23use crate::Error;
24use crate::codec::DRAIN_FRAME_OVERHEAD;
25
26struct QueueInner {
32 frames: VecDeque<Vec<u8>>,
34 bytes_used: usize,
36 max_bytes: usize,
38}
39
40impl QueueInner {
41 fn new(max_bytes: usize) -> Self {
43 Self {
44 frames: VecDeque::new(),
45 bytes_used: 0,
46 max_bytes,
47 }
48 }
49
50 #[inline]
52 fn frame_cost(frame: &[u8]) -> usize {
53 DRAIN_FRAME_OVERHEAD + frame.len()
54 }
55}
56
57pub struct Queue {
72 inner: Mutex<QueueInner>,
73}
74
75impl Queue {
76 pub fn new(max_bytes: usize) -> Self {
90 Self {
91 inner: Mutex::new(QueueInner::new(max_bytes)),
92 }
93 }
94
95 pub fn unbounded() -> Self {
105 Self::new(0)
106 }
107
108 pub fn push(&self, frame: &[u8]) -> Result<(), Error> {
115 let cost = QueueInner::frame_cost(frame);
116 let mut inner = crate::lock_or_recover(&self.inner);
117
118 if inner.max_bytes > 0 && inner.bytes_used + cost > inner.max_bytes {
119 return Err(Error::ChannelFull);
120 }
121
122 inner.frames.push_back(frame.to_vec());
123 inner.bytes_used = inner.bytes_used.saturating_add(cost);
124 Ok(())
125 }
126
127 #[must_use]
131 pub fn try_pop(&self) -> Option<Vec<u8>> {
132 let mut inner = crate::lock_or_recover(&self.inner);
133 let frame = inner.frames.pop_front()?;
134 inner.bytes_used -= QueueInner::frame_cost(&frame);
135 Some(frame)
136 }
137
138 #[must_use]
151 pub fn drain_all(&self) -> Vec<u8> {
152 let (mut frames, bytes_used) = {
154 let mut inner = crate::lock_or_recover(&self.inner);
155 if inner.frames.is_empty() {
156 return Vec::new();
157 }
158 let frames = std::mem::take(&mut inner.frames);
159 let bytes_used = inner.bytes_used;
160 inner.bytes_used = 0;
161 (frames, bytes_used)
162 };
163 let output_size = 4usize.saturating_add(bytes_used);
165 let mut buf = Vec::with_capacity(output_size);
166
167 let count = frames.len() as u32;
169 buf.extend_from_slice(&count.to_le_bytes());
170
171 for frame in frames.make_contiguous() {
173 let len = frame.len() as u32;
174 buf.extend_from_slice(&len.to_le_bytes());
175 buf.extend_from_slice(frame);
176 }
177
178 buf
179 }
180
181 #[must_use]
183 pub fn frame_count(&self) -> usize {
184 crate::lock_or_recover(&self.inner).frames.len()
185 }
186
187 #[must_use]
189 pub fn bytes_used(&self) -> usize {
190 crate::lock_or_recover(&self.inner).bytes_used
191 }
192
193 #[must_use]
195 pub fn max_bytes(&self) -> usize {
196 crate::lock_or_recover(&self.inner).max_bytes
197 }
198
199 pub fn clear(&self) {
201 let mut inner = crate::lock_or_recover(&self.inner);
202 inner.frames.clear();
203 inner.bytes_used = 0;
204 }
205}
206
207impl std::fmt::Debug for Queue {
208 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 let inner = crate::lock_or_recover(&self.inner);
210 f.debug_struct("Queue")
211 .field("frame_count", &inner.frames.len())
212 .field("bytes_used", &inner.bytes_used)
213 .field("max_bytes", &inner.max_bytes)
214 .finish()
215 }
216}
217
218#[cfg(test)]
223mod tests {
224 use super::*;
225
226 #[test]
227 fn push_and_pop() {
228 let q = Queue::new(1024);
229 q.push(b"alpha").unwrap();
230 q.push(b"beta").unwrap();
231 q.push(b"gamma").unwrap();
232
233 assert_eq!(q.frame_count(), 3);
234 assert_eq!(q.try_pop().unwrap(), b"alpha");
235 assert_eq!(q.try_pop().unwrap(), b"beta");
236 assert_eq!(q.try_pop().unwrap(), b"gamma");
237 assert!(q.try_pop().is_none());
238 }
239
240 #[test]
241 fn push_within_limit() {
242 let q = Queue::new(16);
245 q.push(b"aaaa").unwrap(); q.push(b"bbbb").unwrap(); assert_eq!(q.frame_count(), 2);
248 assert_eq!(q.bytes_used(), 16);
249 }
250
251 #[test]
252 fn push_exceeds_limit() {
253 let q = Queue::new(16);
255 q.push(b"aaaa").unwrap(); q.push(b"bbbb").unwrap(); let err = q.push(b"cccc").unwrap_err();
260 assert!(matches!(err, Error::ChannelFull));
261 assert_eq!(err.to_string(), "channel full: byte limit reached");
262
263 assert_eq!(q.frame_count(), 2);
265 assert_eq!(q.try_pop().unwrap(), b"aaaa");
266 assert_eq!(q.try_pop().unwrap(), b"bbbb");
267 }
268
269 #[test]
270 fn drain_all_format() {
271 let q = Queue::new(1024);
272 q.push(b"hello").unwrap();
273 q.push(b"world").unwrap();
274
275 let blob = q.drain_all();
276
277 let count = u32::from_le_bytes(blob[0..4].try_into().unwrap());
279 assert_eq!(count, 2);
280
281 let len1 = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
282 assert_eq!(len1, 5);
283 assert_eq!(&blob[8..8 + len1], b"hello");
284
285 let offset2 = 8 + len1;
286 let len2 = u32::from_le_bytes(blob[offset2..offset2 + 4].try_into().unwrap()) as usize;
287 assert_eq!(len2, 5);
288 assert_eq!(&blob[offset2 + 4..offset2 + 4 + len2], b"world");
289
290 assert_eq!(q.frame_count(), 0);
292 assert_eq!(q.bytes_used(), 0);
293 }
294
295 #[test]
296 fn drain_frees_capacity() {
297 let q = Queue::new(16);
298 q.push(b"aaaa").unwrap(); q.push(b"bbbb").unwrap(); assert!(q.push(b"cccc").is_err());
303
304 let blob = q.drain_all();
306 assert!(!blob.is_empty());
307 assert_eq!(q.bytes_used(), 0);
308
309 q.push(b"dddd").unwrap();
311 q.push(b"eeee").unwrap();
312 assert_eq!(q.frame_count(), 2);
313 }
314
315 #[test]
316 fn unbounded_mode() {
317 let q = Queue::unbounded();
318 assert_eq!(q.max_bytes(), 0);
319
320 for i in 0u32..10_000 {
322 q.push(&i.to_le_bytes()).unwrap();
323 }
324 assert_eq!(q.frame_count(), 10_000);
325 }
326
327 #[test]
328 fn frame_count_and_bytes() {
329 let q = Queue::new(1024);
330
331 assert_eq!(q.frame_count(), 0);
332 assert_eq!(q.bytes_used(), 0);
333 assert_eq!(q.max_bytes(), 1024);
334
335 q.push(b"abc").unwrap(); assert_eq!(q.frame_count(), 1);
337 assert_eq!(q.bytes_used(), 7);
338
339 q.push(b"de").unwrap(); assert_eq!(q.frame_count(), 2);
341 assert_eq!(q.bytes_used(), 13);
342
343 let _ = q.try_pop();
344 assert_eq!(q.frame_count(), 1);
345 assert_eq!(q.bytes_used(), 6);
346 }
347
348 #[test]
349 fn clear() {
350 let q = Queue::new(1024);
351 q.push(b"one").unwrap();
352 q.push(b"two").unwrap();
353 q.push(b"three").unwrap();
354
355 assert_eq!(q.frame_count(), 3);
356 q.clear();
357 assert_eq!(q.frame_count(), 0);
358 assert_eq!(q.bytes_used(), 0);
359 assert!(q.try_pop().is_none());
360 }
361
362 #[test]
363 fn concurrent_push_pop() {
364 use std::sync::Arc;
365
366 let q = Arc::new(Queue::unbounded());
367 let q_producer = Arc::clone(&q);
368 let q_consumer = Arc::clone(&q);
369
370 let producer = std::thread::spawn(move || {
371 for i in 0u32..1000 {
372 q_producer.push(&i.to_le_bytes()).unwrap();
373 }
374 });
375
376 let consumer = std::thread::spawn(move || {
377 let mut popped = 0usize;
378 loop {
379 if q_consumer.try_pop().is_some() {
380 popped += 1;
381 }
382 if popped >= 1000 {
383 break;
384 }
385 std::thread::yield_now();
387 }
388 popped
389 });
390
391 producer.join().unwrap();
392 let consumer_popped = consumer.join().unwrap();
393
394 let remaining = q.frame_count();
397 assert_eq!(consumer_popped + remaining, 1000);
398 }
399
400 #[test]
401 fn empty_drain() {
402 let q = Queue::new(1024);
403 let blob = q.drain_all();
404 assert!(blob.is_empty());
405 }
406
407 #[test]
408 fn drain_then_push() {
409 let q = Queue::new(1024);
410 q.push(b"first").unwrap();
411 let blob = q.drain_all();
412 assert!(!blob.is_empty());
413
414 q.push(b"second").unwrap();
416 assert_eq!(q.frame_count(), 1);
417 assert_eq!(q.try_pop().unwrap(), b"second");
418 }
419}