use std::cmp::Ordering;
use std::fmt::Debug;
use std::ops::Add;
use async_trait::async_trait;
use thiserror::Error;
use crate::util::element::Element;
pub mod mpsc_bounded_channel_queue;
mod mpsc_bounded_channel_queue_test;
pub mod mpsc_unbounded_channel_queue;
mod mpsc_unbounded_channel_queue_test;
pub mod priority_queue;
mod priority_queue_test;
pub mod ring_queue;
mod ring_queue_test;
#[derive(Error, Debug, PartialEq)]
pub enum QueueError<E> {
#[error("Failed to offer an element: {0:?}")]
OfferError(E),
#[error("Failed to pool an element")]
PoolError,
#[error("Failed to peek an element")]
PeekError,
#[error("Failed to contains an element")]
ContainsError,
#[error("Failed to interrupt")]
InterruptedError,
#[error("Failed to timeout")]
TimeoutError,
}
#[derive(Debug, Clone)]
pub enum QueueSize {
Limitless,
Limited(usize),
}
impl QueueSize {
fn increment(&mut self) {
match self {
QueueSize::Limited(c) => {
*c += 1;
}
_ => {}
}
}
fn decrement(&mut self) {
match self {
QueueSize::Limited(c) => {
*c -= 1;
}
_ => {}
}
}
pub fn is_limitless(&self) -> bool {
match self {
QueueSize::Limitless => true,
_ => false,
}
}
pub fn to_option(&self) -> Option<usize> {
match self {
QueueSize::Limitless => None,
QueueSize::Limited(c) => Some(*c),
}
}
pub fn to_usize(&self) -> usize {
match self {
QueueSize::Limitless => usize::MAX,
QueueSize::Limited(c) => *c,
}
}
}
impl Add for QueueSize {
type Output = QueueSize;
fn add(self, other: QueueSize) -> QueueSize {
match (self, other) {
(QueueSize::Limitless, _) | (_, QueueSize::Limitless) => QueueSize::Limitless,
(QueueSize::Limited(a), QueueSize::Limited(b)) => QueueSize::Limited(a + b),
}
}
}
impl PartialEq<Self> for QueueSize {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(QueueSize::Limitless, QueueSize::Limitless) => true,
(QueueSize::Limited(l), QueueSize::Limited(r)) => l == r,
_ => false,
}
}
}
impl PartialOrd<QueueSize> for QueueSize {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match (self, other) {
(QueueSize::Limitless, QueueSize::Limitless) => Some(Ordering::Equal),
(QueueSize::Limitless, _) => Some(Ordering::Greater),
(_, QueueSize::Limitless) => Some(Ordering::Less),
(QueueSize::Limited(l), QueueSize::Limited(r)) => l.partial_cmp(r),
}
}
}
#[async_trait]
pub trait QueueBase<E: Element>: Debug + Send + Sync {
async fn is_empty(&self) -> bool {
self.len().await == QueueSize::Limited(0)
}
async fn non_empty(&self) -> bool {
!self.is_empty().await
}
async fn is_full(&self) -> bool {
self.capacity().await == self.len().await
}
async fn non_full(&self) -> bool {
!self.is_full().await
}
async fn len(&self) -> QueueSize;
async fn capacity(&self) -> QueueSize;
}
#[async_trait]
pub trait QueueWriteFactory<E: Element>: QueueBase<E> {
type Writer: QueueWriter<E>;
fn writer(&self) -> Self::Writer;
}
#[async_trait::async_trait]
pub trait QueueWriter<E: Element>: QueueBase<E> {
async fn offer(&mut self, element: E) -> Result<(), QueueError<E>>;
async fn offer_all(&mut self, elements: Vec<E>) -> Result<(), QueueError<E>> {
for e in elements {
self.offer(e).await?;
}
Ok(())
}
}
#[async_trait::async_trait]
pub trait QueueReadFactory<E: Element>: QueueBase<E> {
type Reader: QueueReader<E>;
fn reader(&self) -> Self::Reader;
}
#[async_trait]
pub trait QueueReader<E: Element>: QueueBase<E> {
async fn poll(&mut self) -> Result<Option<E>, QueueError<E>>;
async fn clean_up(&mut self);
}
#[async_trait]
pub trait HasPeekBehavior<E: Element>: QueueReader<E> {
async fn peek(&self) -> Result<Option<E>, QueueError<E>>;
}
#[async_trait]
pub trait HasContainsBehavior<E: Element>: QueueReader<E> {
async fn contains(&self, element: &E) -> bool;
}
#[async_trait]
pub trait BlockingQueueBase<E: Element>: QueueBase<E> + Send {
async fn remaining_capacity(&self) -> QueueSize;
async fn is_interrupted(&self) -> bool;
}
#[async_trait]
pub trait BlockingQueueWriter<E: Element>: BlockingQueueBase<E> + QueueWriter<E> {
async fn put(&mut self, element: E) -> Result<(), QueueError<E>>;
async fn interrupt(&mut self);
}
#[async_trait]
pub trait BlockingQueueReader<E: Element>: BlockingQueueBase<E> {
async fn take(&mut self) -> Result<Option<E>, QueueError<E>>;
}