use super::spsc::SpscQueue;
use super::{QueueResult, QueueStatsSnapshot};
#[derive(Debug, Clone, Copy)]
pub enum QueueType {
SingleProducer,
MultiProducer,
}
pub struct RtQueue<T: Copy> {
inner: RtQueueInner<T>,
}
enum RtQueueInner<T: Copy> {
Spsc(SpscQueue<T, 1024>), Mpsc(super::mpsc::MpscQueue<T>), }
impl<T: Copy + Default + Send + 'static> RtQueue<T> {
pub fn new(capacity: usize) -> Self {
if capacity <= 1024 {
Self {
inner: RtQueueInner::Spsc(SpscQueue::new()),
}
} else {
Self {
inner: RtQueueInner::Mpsc(super::mpsc::MpscQueue::with_capacity(capacity)),
}
}
}
pub fn new_spsc() -> Self {
Self {
inner: RtQueueInner::Spsc(SpscQueue::new()),
}
}
pub fn new_mpsc(capacity: usize) -> Self {
Self {
inner: RtQueueInner::Mpsc(super::mpsc::MpscQueue::with_capacity(capacity)),
}
}
pub fn push(&self, value: T) -> QueueResult<()> {
match &self.inner {
RtQueueInner::Spsc(q) => q.push(value),
RtQueueInner::Mpsc(q) => q.push(value),
}
}
pub fn pop(&self) -> Option<T> {
match &self.inner {
RtQueueInner::Spsc(q) => q.pop(),
RtQueueInner::Mpsc(q) => q.pop(),
}
}
pub fn len(&self) -> usize {
match &self.inner {
RtQueueInner::Spsc(q) => q.len(),
RtQueueInner::Mpsc(q) => q.size(),
}
}
pub fn capacity(&self) -> usize {
match &self.inner {
RtQueueInner::Spsc(q) => q.capacity(),
RtQueueInner::Mpsc(q) => q.capacity(),
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn stats(&self) -> QueueStatsSnapshot {
match &self.inner {
RtQueueInner::Spsc(q) => q.stats(),
RtQueueInner::Mpsc(_q) => {
QueueStatsSnapshot {
pushes: 0,
pops: 0,
overflows: 0,
underflows: 0,
max_size: 0,
}
}
}
}
}
impl<T: Copy> Clone for RtQueue<T> {
fn clone(&self) -> Self {
match &self.inner {
RtQueueInner::Spsc(_) => panic!("Cannot clone SPSC queue"),
RtQueueInner::Mpsc(q) => Self {
inner: RtQueueInner::Mpsc(super::mpsc::MpscQueue::with_capacity(q.capacity())),
},
}
}
}
#[allow(unsafe_code)]
unsafe impl<T: Copy + Send> Send for RtQueue<T> {}
#[allow(unsafe_code)]
unsafe impl<T: Copy + Sync> Sync for RtQueue<T> {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_rt_queue_spsc() {
let queue = RtQueue::<i32>::new_spsc();
queue.push(42).unwrap();
assert_eq!(queue.pop(), Some(42));
assert_eq!(queue.pop(), None);
}
#[test]
fn test_rt_queue_mpsc() {
let queue = RtQueue::<i32>::new_mpsc(16);
queue.push(1).unwrap();
queue.push(2).unwrap();
queue.push(3).unwrap();
assert_eq!(queue.pop(), Some(1));
assert_eq!(queue.pop(), Some(2));
assert_eq!(queue.pop(), Some(3));
}
}