#[cfg(test)]
extern crate env_logger;
use std::cmp::Ordering;
use std::fmt::Debug;
use std::sync::Arc;
use anyhow::Result;
use thiserror::Error;
pub use blocking_queue::*;
pub use queue_mpsc::*;
pub use queue_vec::*;
mod blocking_queue;
mod queue_mpsc;
mod queue_vec;
pub trait Element: Debug + Clone + Send + Sync {}
impl Element for i8 {}
impl Element for i16 {}
impl Element for i32 {}
impl Element for i64 {}
impl Element for u8 {}
impl Element for u16 {}
impl Element for u32 {}
impl Element for u64 {}
impl Element for usize {}
impl Element for f32 {}
impl Element for f64 {}
impl Element for String {}
impl<T: Debug + Clone + Send + Sync> Element for Box<T> {}
impl<T: Debug + Clone + Send + Sync> Element for Arc<T> {}
#[derive(Error, Debug)]
pub enum QueueError<E> {
#[error("Failed to offer an element: {0:?}")]
OfferError(E),
#[error("Failed to pool an element")]
PoolError,
#[error("Failed to peek an element")]
PeekError,
}
#[derive(Debug, Clone)]
pub enum QueueSize {
Limitless,
Limited(usize),
}
impl QueueSize {
fn increment(&mut self) {
match self {
QueueSize::Limited(c) => {
*c += 1;
}
_ => {}
}
}
fn decrement(&mut self) {
match self {
QueueSize::Limited(c) => {
*c -= 1;
}
_ => {}
}
}
}
impl PartialEq<Self> for QueueSize {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(QueueSize::Limitless, QueueSize::Limitless) => true,
(QueueSize::Limited(l), QueueSize::Limited(r)) => l == r,
_ => false,
}
}
}
impl PartialOrd<QueueSize> for QueueSize {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match (self, other) {
(QueueSize::Limitless, QueueSize::Limitless) => Some(Ordering::Equal),
(QueueSize::Limitless, _) => Some(Ordering::Greater),
(_, QueueSize::Limitless) => Some(Ordering::Less),
(QueueSize::Limited(l), QueueSize::Limited(r)) => l.partial_cmp(r),
}
}
}
pub trait QueueBehavior<E>: Send + Sized {
fn is_empty(&self) -> bool {
self.len() == QueueSize::Limited(0)
}
fn non_empty(&self) -> bool {
!self.is_empty()
}
fn is_full(&self) -> bool {
self.capacity() == self.len()
}
fn non_full(&self) -> bool {
!self.is_full()
}
fn len(&self) -> QueueSize;
fn capacity(&self) -> QueueSize;
fn offer(&mut self, e: E) -> Result<()>;
fn poll(&mut self) -> Result<Option<E>>;
}
pub trait HasPeekBehavior<E: Element>: QueueBehavior<E> {
fn peek(&self) -> Result<Option<E>>;
}
pub trait BlockingQueueBehavior<E: Element>: QueueBehavior<E> + Send {
fn put(&mut self, e: E) -> Result<()>;
fn take(&mut self) -> Result<Option<E>>;
}
pub enum QueueType {
Vec,
MPSC,
}
#[derive(Debug, Clone)]
pub enum Queue<T> {
Vec(QueueVec<T>),
MPSC(QueueMPSC<T>),
}
impl<T: Element + 'static> Queue<T> {
pub fn with_blocking(self) -> BlockingQueue<T, Queue<T>> {
BlockingQueue::new(self)
}
}
impl<T: Element + 'static> QueueBehavior<T> for Queue<T> {
fn len(&self) -> QueueSize {
match self {
Queue::Vec(inner) => inner.len(),
Queue::MPSC(inner) => inner.len(),
}
}
fn capacity(&self) -> QueueSize {
match self {
Queue::Vec(inner) => inner.capacity(),
Queue::MPSC(inner) => inner.capacity(),
}
}
fn offer(&mut self, e: T) -> Result<()> {
match self {
Queue::Vec(inner) => inner.offer(e),
Queue::MPSC(inner) => inner.offer(e),
}
}
fn poll(&mut self) -> Result<Option<T>> {
match self {
Queue::Vec(inner) => inner.poll(),
Queue::MPSC(inner) => inner.poll(),
}
}
}
pub fn create_queue<T: Element + 'static>(queue_type: QueueType, num_elements: Option<usize>) -> Queue<T> {
match (queue_type, num_elements) {
(QueueType::Vec, None) => Queue::Vec(QueueVec::<T>::new()),
(QueueType::Vec, Some(num)) => Queue::Vec(QueueVec::<T>::with_num_elements(num)),
(QueueType::MPSC, None) => Queue::MPSC(QueueMPSC::<T>::new()),
(QueueType::MPSC, Some(num)) => Queue::MPSC(QueueMPSC::<T>::with_num_elements(num)),
}
}
#[cfg(test)]
mod tests {
use std::thread::sleep;
use std::time::Duration;
use std::{env, thread};
use fp_rust::sync::CountDownLatch;
use crate::queue::BlockingQueueBehavior;
use crate::queue::{create_queue, QueueBehavior, QueueType};
fn init_logger() {
env::set_var("RUST_LOG", "debug");
let _ = env_logger::try_init();
}
fn test_queue_vec<Q>(queue: Q)
where
Q: QueueBehavior<i32> + Clone + 'static, {
let cdl = CountDownLatch::new(1);
let cdl2 = cdl.clone();
let mut q1 = queue;
let mut q2 = q1.clone();
let max = 5;
let handler1 = thread::spawn(move || {
cdl2.countdown();
for i in 1..=max {
log::debug!("take: start: {}", i);
let n = q2.poll();
log::debug!("take: finish: {},{:?}", i, n);
}
});
cdl.wait();
let handler2 = thread::spawn(move || {
sleep(Duration::from_secs(3));
for i in 1..=max {
log::debug!("put: start: {}", i);
q1.offer(i).unwrap();
log::debug!("put: finish: {}", i);
}
});
handler1.join().unwrap();
handler2.join().unwrap();
}
fn test_blocking_queue_vec<Q>(queue: Q)
where
Q: BlockingQueueBehavior<i32> + Clone + 'static, {
let cdl = CountDownLatch::new(1);
let cdl2 = cdl.clone();
let mut bqv1 = queue;
let mut bqv2 = bqv1.clone();
let max = 5;
let handler1 = thread::spawn(move || {
cdl2.countdown();
for i in 1..=max {
log::debug!("take: start: {}", i);
let n = bqv2.take();
log::debug!("take: finish: {},{:?}", i, n);
}
});
cdl.wait();
let handler2 = thread::spawn(move || {
sleep(Duration::from_secs(3));
for i in 1..=max {
log::debug!("put: start: {}", i);
bqv1.offer(i).unwrap();
log::debug!("put: finish: {}", i);
}
});
handler1.join().unwrap();
handler2.join().unwrap();
}
#[test]
fn test() {
init_logger();
let q = create_queue(QueueType::Vec, Some(32));
test_queue_vec(q);
let bq = create_queue(QueueType::Vec, Some(32)).with_blocking();
test_blocking_queue_vec(bq);
}
}