1use std::sync::Mutex;
21
22use crate::Error;
23use crate::codec::DRAIN_FRAME_OVERHEAD;
24
25struct QueueInner {
35 wire_data: Vec<u8>,
37 frame_count: u32,
39 read_pos: usize,
41 bytes_used: usize,
43 max_bytes: usize,
45}
46
47impl QueueInner {
48 fn new(max_bytes: usize) -> Self {
50 Self {
51 wire_data: Vec::new(),
52 frame_count: 0,
53 read_pos: 0,
54 bytes_used: 0,
55 max_bytes,
56 }
57 }
58
59 #[inline]
61 fn frame_cost(frame: &[u8]) -> usize {
62 DRAIN_FRAME_OVERHEAD + frame.len()
63 }
64}
65
66pub struct Queue {
81 inner: Mutex<QueueInner>,
82}
83
84impl Queue {
85 pub fn new(max_bytes: usize) -> Self {
99 Self {
100 inner: Mutex::new(QueueInner::new(max_bytes)),
101 }
102 }
103
104 pub fn unbounded() -> Self {
114 Self::new(0)
115 }
116
117 pub fn push(&self, frame: &[u8]) -> Result<(), Error> {
124 if frame.len() > u32::MAX as usize
127 || DRAIN_FRAME_OVERHEAD.checked_add(frame.len()).is_none()
128 {
129 return Err(Error::PayloadTooLarge(frame.len()));
130 }
131
132 let cost = QueueInner::frame_cost(frame);
133 let mut inner = crate::lock_or_recover(&self.inner);
134
135 if inner.max_bytes > 0 && inner.bytes_used + cost > inner.max_bytes {
136 return Err(Error::ChannelFull);
137 }
138
139 if inner.frame_count == u32::MAX {
141 return Err(Error::ChannelFull);
142 }
143
144 inner
146 .wire_data
147 .extend_from_slice(&(frame.len() as u32).to_le_bytes());
148 inner.wire_data.extend_from_slice(frame);
149 inner.frame_count += 1;
150 inner.bytes_used = inner.bytes_used.saturating_add(cost);
151 Ok(())
152 }
153
154 #[must_use]
158 pub fn try_pop(&self) -> Option<Vec<u8>> {
159 let mut inner = crate::lock_or_recover(&self.inner);
160 if inner.frame_count == 0 {
161 return None;
162 }
163 let len_bytes: [u8; 4] = inner.wire_data[inner.read_pos..inner.read_pos + 4]
164 .try_into()
165 .unwrap();
166 let payload_len = u32::from_le_bytes(len_bytes) as usize;
167 let payload_start = inner.read_pos + 4;
168 let frame = inner.wire_data[payload_start..payload_start + payload_len].to_vec();
169 let cost = DRAIN_FRAME_OVERHEAD + payload_len;
170 inner.read_pos += cost;
171 inner.frame_count -= 1;
172 inner.bytes_used -= cost;
173
174 if inner.frame_count == 0 {
178 inner.wire_data.clear();
179 inner.read_pos = 0;
180 } else if inner.read_pos > inner.wire_data.len() / 2 {
181 let rp = inner.read_pos;
182 inner.wire_data.copy_within(rp.., 0);
183 let new_len = inner.wire_data.len() - rp;
184 inner.wire_data.truncate(new_len);
185 inner.read_pos = 0;
186 }
187
188 Some(frame)
189 }
190
191 #[must_use]
204 pub fn drain_all(&self) -> Vec<u8> {
205 let (wire_data, read_pos, frame_count) = {
208 let mut inner = crate::lock_or_recover(&self.inner);
209 if inner.frame_count == 0 {
210 return Vec::new();
211 }
212 let wire_data = std::mem::take(&mut inner.wire_data);
213 let read_pos = inner.read_pos;
214 let frame_count = inner.frame_count;
215 inner.read_pos = 0;
216 inner.frame_count = 0;
217 inner.bytes_used = 0;
218 (wire_data, read_pos, frame_count)
219 };
220 let live_data = &wire_data[read_pos..];
222 let output_size = 4 + live_data.len();
223 let mut buf = Vec::with_capacity(output_size);
224 buf.extend_from_slice(&frame_count.to_le_bytes());
225 buf.extend_from_slice(live_data);
226 buf
227 }
228
229 #[must_use]
231 pub fn frame_count(&self) -> usize {
232 crate::lock_or_recover(&self.inner).frame_count as usize
233 }
234
235 #[must_use]
237 pub fn bytes_used(&self) -> usize {
238 crate::lock_or_recover(&self.inner).bytes_used
239 }
240
241 #[must_use]
243 pub fn max_bytes(&self) -> usize {
244 crate::lock_or_recover(&self.inner).max_bytes
245 }
246
247 pub fn clear(&self) {
249 let mut inner = crate::lock_or_recover(&self.inner);
250 inner.wire_data.clear();
251 inner.frame_count = 0;
252 inner.read_pos = 0;
253 inner.bytes_used = 0;
254 }
255}
256
257impl std::fmt::Debug for Queue {
258 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259 let inner = crate::lock_or_recover(&self.inner);
260 f.debug_struct("Queue")
261 .field("frame_count", &inner.frame_count)
262 .field("bytes_used", &inner.bytes_used)
263 .field("max_bytes", &inner.max_bytes)
264 .finish()
265 }
266}
267
268#[cfg(test)]
273mod tests {
274 use super::*;
275
276 #[test]
277 fn push_and_pop() {
278 let q = Queue::new(1024);
279 q.push(b"alpha").unwrap();
280 q.push(b"beta").unwrap();
281 q.push(b"gamma").unwrap();
282
283 assert_eq!(q.frame_count(), 3);
284 assert_eq!(q.try_pop().unwrap(), b"alpha");
285 assert_eq!(q.try_pop().unwrap(), b"beta");
286 assert_eq!(q.try_pop().unwrap(), b"gamma");
287 assert!(q.try_pop().is_none());
288 }
289
290 #[test]
291 fn push_within_limit() {
292 let q = Queue::new(16);
295 q.push(b"aaaa").unwrap(); q.push(b"bbbb").unwrap(); assert_eq!(q.frame_count(), 2);
298 assert_eq!(q.bytes_used(), 16);
299 }
300
301 #[test]
302 fn push_exceeds_limit() {
303 let q = Queue::new(16);
305 q.push(b"aaaa").unwrap(); q.push(b"bbbb").unwrap(); let err = q.push(b"cccc").unwrap_err();
310 assert!(matches!(err, Error::ChannelFull));
311 assert_eq!(err.to_string(), "channel full: byte limit reached");
312
313 assert_eq!(q.frame_count(), 2);
315 assert_eq!(q.try_pop().unwrap(), b"aaaa");
316 assert_eq!(q.try_pop().unwrap(), b"bbbb");
317 }
318
319 #[test]
320 fn drain_all_format() {
321 let q = Queue::new(1024);
322 q.push(b"hello").unwrap();
323 q.push(b"world").unwrap();
324
325 let blob = q.drain_all();
326
327 let count = u32::from_le_bytes(blob[0..4].try_into().unwrap());
329 assert_eq!(count, 2);
330
331 let len1 = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
332 assert_eq!(len1, 5);
333 assert_eq!(&blob[8..8 + len1], b"hello");
334
335 let offset2 = 8 + len1;
336 let len2 = u32::from_le_bytes(blob[offset2..offset2 + 4].try_into().unwrap()) as usize;
337 assert_eq!(len2, 5);
338 assert_eq!(&blob[offset2 + 4..offset2 + 4 + len2], b"world");
339
340 assert_eq!(q.frame_count(), 0);
342 assert_eq!(q.bytes_used(), 0);
343 }
344
345 #[test]
346 fn drain_frees_capacity() {
347 let q = Queue::new(16);
348 q.push(b"aaaa").unwrap(); q.push(b"bbbb").unwrap(); assert!(q.push(b"cccc").is_err());
353
354 let blob = q.drain_all();
356 assert!(!blob.is_empty());
357 assert_eq!(q.bytes_used(), 0);
358
359 q.push(b"dddd").unwrap();
361 q.push(b"eeee").unwrap();
362 assert_eq!(q.frame_count(), 2);
363 }
364
365 #[test]
366 fn unbounded_mode() {
367 let q = Queue::unbounded();
368 assert_eq!(q.max_bytes(), 0);
369
370 for i in 0u32..10_000 {
372 q.push(&i.to_le_bytes()).unwrap();
373 }
374 assert_eq!(q.frame_count(), 10_000);
375 }
376
377 #[test]
378 fn frame_count_and_bytes() {
379 let q = Queue::new(1024);
380
381 assert_eq!(q.frame_count(), 0);
382 assert_eq!(q.bytes_used(), 0);
383 assert_eq!(q.max_bytes(), 1024);
384
385 q.push(b"abc").unwrap(); assert_eq!(q.frame_count(), 1);
387 assert_eq!(q.bytes_used(), 7);
388
389 q.push(b"de").unwrap(); assert_eq!(q.frame_count(), 2);
391 assert_eq!(q.bytes_used(), 13);
392
393 let _ = q.try_pop();
394 assert_eq!(q.frame_count(), 1);
395 assert_eq!(q.bytes_used(), 6);
396 }
397
398 #[test]
399 fn clear() {
400 let q = Queue::new(1024);
401 q.push(b"one").unwrap();
402 q.push(b"two").unwrap();
403 q.push(b"three").unwrap();
404
405 assert_eq!(q.frame_count(), 3);
406 q.clear();
407 assert_eq!(q.frame_count(), 0);
408 assert_eq!(q.bytes_used(), 0);
409 assert!(q.try_pop().is_none());
410 }
411
412 #[test]
413 fn concurrent_push_pop() {
414 use std::sync::Arc;
415
416 let q = Arc::new(Queue::unbounded());
417 let q_producer = Arc::clone(&q);
418 let q_consumer = Arc::clone(&q);
419
420 let producer = std::thread::spawn(move || {
421 for i in 0u32..1000 {
422 q_producer.push(&i.to_le_bytes()).unwrap();
423 }
424 });
425
426 let consumer = std::thread::spawn(move || {
427 let mut popped = 0usize;
428 loop {
429 if q_consumer.try_pop().is_some() {
430 popped += 1;
431 }
432 if popped >= 1000 {
433 break;
434 }
435 std::thread::yield_now();
437 }
438 popped
439 });
440
441 producer.join().unwrap();
442 let consumer_popped = consumer.join().unwrap();
443
444 let remaining = q.frame_count();
447 assert_eq!(consumer_popped + remaining, 1000);
448 }
449
450 #[test]
451 fn empty_drain() {
452 let q = Queue::new(1024);
453 let blob = q.drain_all();
454 assert!(blob.is_empty());
455 }
456
457 #[test]
458 fn drain_then_push() {
459 let q = Queue::new(1024);
460 q.push(b"first").unwrap();
461 let blob = q.drain_all();
462 assert!(!blob.is_empty());
463
464 q.push(b"second").unwrap();
466 assert_eq!(q.frame_count(), 1);
467 assert_eq!(q.try_pop().unwrap(), b"second");
468 }
469}