pulsedeck 0.1.7

A cyber-synthwave internet radio player and smart tape recorder for your terminal
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Condvar, Mutex};

/// Bounded Producer-Consumer circular byte queue (Resiliency Buffer)
pub(super) struct BufferQueue {
    queue: Mutex<VecDeque<u8>>,
    cv_read: Condvar,
    cv_write: Condvar,
    pub(super) capacity: usize,
    disconnected: AtomicBool,
}

impl BufferQueue {
    pub(super) fn new(capacity: usize) -> Self {
        Self {
            queue: Mutex::new(VecDeque::with_capacity(capacity)),
            cv_read: Condvar::new(),
            cv_write: Condvar::new(),
            capacity,
            disconnected: AtomicBool::new(false),
        }
    }

    pub(super) fn push(&self, bytes: &[u8]) {
        let mut queue = self.queue.lock().unwrap();
        // Block downloader thread if buffer capacity limit is reached
        while queue.len() + bytes.len() > self.capacity && !self.disconnected.load(Ordering::SeqCst)
        {
            queue = self.cv_write.wait(queue).unwrap();
        }
        if self.disconnected.load(Ordering::SeqCst) {
            return;
        }
        queue.extend(bytes);
        self.cv_read.notify_all();
    }

    pub(super) fn pop(&self, buf: &mut [u8]) -> std::io::Result<usize> {
        let mut queue = self.queue.lock().unwrap();
        while queue.is_empty() && !self.disconnected.load(Ordering::SeqCst) {
            queue = self.cv_read.wait(queue).unwrap();
        }
        if queue.is_empty() && self.disconnected.load(Ordering::SeqCst) {
            return Ok(0); // EOF or network disconnection
        }

        let count = std::cmp::min(buf.len(), queue.len());
        for slot in buf.iter_mut().take(count) {
            *slot = queue.pop_front().unwrap();
        }
        self.cv_write.notify_all();
        Ok(count)
    }

    pub(super) fn len(&self) -> usize {
        self.queue.lock().unwrap().len()
    }

    pub(super) fn set_disconnected(&self, disc: bool) {
        self.disconnected.store(disc, Ordering::SeqCst);
        let _queue = self.queue.lock().unwrap();
        // Wake up both consumer and producer to terminate gracefully
        self.cv_read.notify_all();
        self.cv_write.notify_all();
    }
}