1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
use crossbeam_channel as cbc;
/// A simple multi-producer, multi-consumer blocking queue.
///
/// This queue allows producers and consumers to respectively send and receive data
/// from the provided buffer.
///
/// A blocking queue can be paired with [`BiChannelIO`] to maintain an input buffer:
/// ```
/// use lc3_ensemble::sim::io::{BiChannelIO, BlockingQueue, Stop};
///
/// let queue = BlockingQueue::new(None);
///
/// // fn writer(_: u8) -> Result<(), Stop> { ... }
/// // let mcr = ...
/// # fn writer(_: u8) -> Result<(), Stop> { Ok(()) }
/// # let mcr = std::sync::Arc::default();
///
/// let io = BiChannelIO::new(queue.reader(), writer, mcr);
/// ```
///
/// [`BiChannelIO`]: super::BiChannelIO
pub struct BlockingQueue<T> {
head: cbc::Sender<T>,
tail: cbc::Receiver<T>
}
impl<T> BlockingQueue<T> {
/// Creates a new blocking queue, optionally with a maximum size.
pub fn new(size: Option<usize>) -> Self {
let (head, tail) = match size {
Some(n) => cbc::bounded(n),
None => cbc::unbounded(),
};
Self { head, tail }
}
/// Adds an element to the front of the queue, blocking if the queue is full.
pub fn push(&self, t: T) {
self.head.send(t).unwrap();
}
/// Adds an element to the front of the queue, returning an error if the queue is full.
///
/// This uses [`crossbeam_channel::TrySendError`] as its error,
/// but this function cannot return [`TrySendError::Disconnected`],
/// since the [`BlockingQueue`] has to exist to call this function.
///
/// [`TrySendError::Disconnected`]: crossbeam_channel::TrySendError::Disconnected
pub fn try_push(&self, t: T) -> Result<(), cbc::TrySendError<T>> {
self.head.try_send(t)
}
/// Removes an element from the back of the queue, blocking if the queue is full.
pub fn pop(&self) -> T {
self.tail.recv().unwrap()
}
/// Removes an element from the back of the queue, returning an error if the queue is full.
///
/// This uses [`crossbeam_channel::TryRecvError`] as its error,
/// but this function cannot return [`TryRecvError::Disconnected`],
/// since the [`BlockingQueue`] has to exist to call this function.
///
/// [`TryRecvError::Disconnected`]: crossbeam_channel::TryRecvError::Disconnected
pub fn try_pop(&self) -> Result<T, cbc::TryRecvError> {
self.tail.try_recv()
}
/// Exposes the sending head of the queue.
///
/// This enables all methods of [`crossbeam_channel::Sender`] to be used.
/// However, because the sender's lifetime is not dependent
/// on the queue's lifetime, users of this function should verify
/// that the sender is not disconnected when calling [`Sender::send`]
/// and similar functions.
///
/// [`Sender::send`]: crossbeam_channel::Sender::send
pub fn head(&self) -> cbc::Sender<T> {
self.head.clone()
}
/// Exposes the receiving tail of the queue.
///
/// This enables all methods of [`crossbeam_channel::Receiver`] to be used.
/// However, because the receiver's lifetime is not dependent
/// on the queue's lifetime, users of this function should verify
/// that the sender is not disconnected when calling [`Receiver::recv`]
/// and similar functions.
///
/// [`Receiver::recv`]: crossbeam_channel::Receiver::recv
pub fn tail(&self) -> cbc::Receiver<T> {
self.tail.clone()
}
/// Returns the number of bytes currently in the channel.
pub fn len(&self) -> usize {
self.head.len()
}
/// Returns whether there are any bytes in the channel (`true` if not).
pub fn is_empty(&self) -> bool {
self.head.is_empty()
}
/// A utility to allow this queue to interop with [`BiChannelIO`].
///
/// This can be used as the `reader` parameter of [`BiChannelIO::new`]
/// to allow the IO device to poll this queue for data.
/// See the struct-level documentation for an example.
///
/// [`BiChannelIO`]: super::BiChannelIO
/// [`BiChannelIO::new`]: super::BiChannelIO::new
pub fn reader(&self) -> impl Fn() -> Result<T, super::Stop> {
let tail = self.tail();
move || tail.recv().map_err(|_| super::Stop)
}
}
impl<T> Default for BlockingQueue<T> {
fn default() -> Self {
Self::new(None)
}
}
impl<E> Extend<E> for BlockingQueue<E> {
fn extend<T: IntoIterator<Item = E>>(&mut self, iter: T) {
for t in iter {
self.push(t);
}
}
}