use crate::error::ZmqError;
use fibre::{mpmc::{bounded_async, AsyncReceiver, AsyncSender}, TryRecvError, TrySendError, RecvError};
#[derive(Debug)]
pub(crate) enum PushError<T: Send + 'static> {
Full(T),
Closed(T),
}
#[derive(Debug)]
pub(crate) struct FairQueue<T: Send + 'static> {
receiver: AsyncReceiver<T>,
sender: AsyncSender<T>,
hwm: usize,
}
impl<T: Send + 'static> FairQueue<T> {
pub fn new(capacity: usize) -> Self {
let (sender, receiver) = bounded_async(capacity.max(1));
Self {
receiver,
sender,
hwm: capacity.max(1),
}
}
pub fn pipe_attached(&self, pipe_read_id: usize) {
tracing::trace!(pipe_id = pipe_read_id, hwm = self.hwm, "FairQueue pipe attached");
}
pub fn pipe_detached(&self, pipe_read_id: usize) {
tracing::trace!(pipe_id = pipe_read_id, "FairQueue pipe detached");
}
pub async fn push_item(&self, item: T) -> Result<(), ZmqError> {
self.sender.send(item).await.map_err(|e| {
tracing::error!("FairQueue send error (queue closed?): {:?}", e);
ZmqError::Internal("FairQueue channel closed unexpectedly".into())
})
}
pub async fn pop_item(&self) -> Result<Option<T>, ZmqError> {
match self.receiver.recv().await {
Ok(item) => Ok(Some(item)),
Err(RecvError::Disconnected) => Ok(None), }
}
pub fn try_push_item(&self, item: T) -> Result<(), PushError<T>> {
match self.sender.try_send(item) {
Ok(()) => Ok(()),
Err(TrySendError::Full(returned_item)) => Err(PushError::Full(returned_item)),
Err(TrySendError::Closed(returned_item)) => {
tracing::error!("FairQueue try_send failed: Channel was closed.");
Err(PushError::Closed(returned_item))
}
_ => unreachable!(),
}
}
pub fn try_pop_item(&self) -> Result<Option<T>, ZmqError> {
match self.receiver.try_recv() {
Ok(item) => Ok(Some(item)),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Disconnected) => {
tracing::error!("FairQueue try_recv error: Channel closed");
Err(ZmqError::Internal("FairQueue channel closed unexpectedly".into()))
}
}
}
pub fn capacity(&self) -> usize {
self.hwm
}
pub fn len(&self) -> usize {
self.receiver.len()
}
pub fn is_empty(&self) -> bool {
self.receiver.is_empty()
}
pub fn close(&self) {
self.sender.close();
}
}