use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Condvar, Mutex};
pub(super) struct BufferQueue {
queue: Mutex<VecDeque<u8>>,
cv_read: Condvar,
cv_write: Condvar,
pub(super) capacity: usize,
disconnected: AtomicBool,
}
impl BufferQueue {
pub(super) fn new(capacity: usize) -> Self {
Self {
queue: Mutex::new(VecDeque::with_capacity(capacity)),
cv_read: Condvar::new(),
cv_write: Condvar::new(),
capacity,
disconnected: AtomicBool::new(false),
}
}
pub(super) fn push(&self, bytes: &[u8]) {
let mut queue = self.queue.lock().unwrap();
while queue.len() + bytes.len() > self.capacity && !self.disconnected.load(Ordering::SeqCst)
{
queue = self.cv_write.wait(queue).unwrap();
}
if self.disconnected.load(Ordering::SeqCst) {
return;
}
queue.extend(bytes);
self.cv_read.notify_all();
}
pub(super) fn pop(&self, buf: &mut [u8]) -> std::io::Result<usize> {
let mut queue = self.queue.lock().unwrap();
while queue.is_empty() && !self.disconnected.load(Ordering::SeqCst) {
queue = self.cv_read.wait(queue).unwrap();
}
if queue.is_empty() && self.disconnected.load(Ordering::SeqCst) {
return Ok(0); }
let count = std::cmp::min(buf.len(), queue.len());
for slot in buf.iter_mut().take(count) {
*slot = queue.pop_front().unwrap();
}
self.cv_write.notify_all();
Ok(count)
}
pub(super) fn len(&self) -> usize {
self.queue.lock().unwrap().len()
}
pub(super) fn set_disconnected(&self, disc: bool) {
self.disconnected.store(disc, Ordering::SeqCst);
let _queue = self.queue.lock().unwrap();
self.cv_read.notify_all();
self.cv_write.notify_all();
}
}