pub struct Queue<B, N = ()>
where B: Buffer,
{ /* private fields */ }
Expand description

A buffered MPSC “swap-buffer” queue.

Implementations§

source§

impl<B, N> Queue<B, N>
where B: Buffer, N: Default,

source

pub fn new() -> Self

Create a new queue using buffer default.

Buffer default may have a non-zero capacity, e.g. array buffer.

Examples
let queue: Queue<VecBuffer<usize>> = Queue::new();
source§

impl<B, N> Queue<B, N>
where B: Buffer + Resize, N: Default,

source

pub fn with_capacity(capacity: usize) -> Self

Creates a new queue with the given capacity.

Examples
let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
source§

impl<B, N> Queue<B, N>
where B: Buffer,

source

pub fn notify(&self) -> &N

Returns queue’s Notify implementor.

Examples
use swap_buffer_queue::notify::Notify;

let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
queue.notify().notify_dequeue();
source

pub fn capacity(&self) -> usize

Returns the current enqueuing buffer capacity.

Examples
let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
assert_eq!(queue.capacity(), 42);
source

pub fn len(&self) -> usize

Returns the current enqueuing buffer length.

Examples
let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
assert_eq!(queue.len(), 0);
queue.try_enqueue([0]).unwrap();
assert_eq!(queue.len(), 1);
source

pub fn is_empty(&self) -> bool

Returns true if the current enqueuing buffer is empty.

Examples
let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
assert!(queue.is_empty());
source

pub fn is_closed(&self) -> bool

Returns true if the queue is closed.

Examples
let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
assert!(!queue.is_closed());
queue.close();
assert!(queue.is_closed());
source

pub fn reopen(&self)

Reopen a closed queue.

Calling this method when the queue is not closed has no effect.

Examples
let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
queue.close();
assert!(queue.is_closed());
queue.reopen();
assert!(!queue.is_closed());
source§

impl<B, N> Queue<B, N>
where B: Buffer, N: Notify,

source

pub fn try_enqueue<T>(&self, value: T) -> Result<(), TryEnqueueError<T>>
where T: InsertIntoBuffer<B>,

Tries enqueuing the given value into the queue.

Enqueuing will fail if the queue has insufficient capacity, or if it is closed. In case of success, it will notify waiting dequeuing operations using Notify::notify_dequeue.

Enqueuing a zero-sized value is a no-op.

Examples
let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(1);
queue.try_enqueue([0]).unwrap();
// queue is full
assert_eq!(
    queue.try_enqueue([0]),
    Err(TryEnqueueError::InsufficientCapacity([0]))
);
// let's close the queue
queue.close();
assert_eq!(queue.try_enqueue([0]), Err(TryEnqueueError::Closed([0])));
source

pub fn try_dequeue(&self) -> Result<BufferSlice<'_, B, N>, TryDequeueError>

Tries dequeuing a buffer with all enqueued values from the queue.

This method swaps the current buffer with the other one, which is empty. All concurrent enqueuing must end before the the current buffer is really dequeuable, so the queue may be in a transitory state where try_dequeue must be retried. In this state, after a spin loop, this method will return a TryDequeueError::Pending error.

Dequeuing also fails if the queue is empty, or if it is closed. Moreover, as the algorithm is MPSC, dequeuing is protected against concurrent calls, failing with TryDequeueError::Conflict error.

It returns a BufferSlice, which holds, as its name may indicate, a reference to the dequeued buffer. That’s why, the concurrent dequeuing protection is maintained for the lifetime of the buffer slice.

Examples
let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
queue.try_enqueue([0]).unwrap();
queue.try_enqueue([1]).unwrap();
{
    let slice = queue.try_dequeue().unwrap();
    assert_eq!(slice.deref(), &[0, 1]);
    // dequeuing cannot be done concurrently (`slice` is still in scope)
    assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Conflict);
}
// let's close the queue
queue.try_enqueue([2]).unwrap();
queue.close();
// queue can be dequeued while closed when not empty
{
    let slice = queue.try_dequeue().unwrap();
    assert_eq!(slice.deref(), &[2]);
}
assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Closed)
source

pub fn close(&self)

Closes the queue.

Closed queue can no more accept enqueuing, but it can be dequeued while not empty. Calling this method on a closed queue has no effect. See reopen to reopen a closed queue.

Examples
let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
queue.try_enqueue([0]).unwrap();
queue.close();
assert!(queue.is_closed());
assert_eq!(queue.try_enqueue([1]), Err(TryEnqueueError::Closed([1])));
assert_eq!(queue.try_dequeue().unwrap().deref(), &[0]);
assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Closed);
source§

impl<B, N> Queue<B, N>
where B: Buffer + Resize, N: Notify,

source

pub fn try_dequeue_and_resize<I>( &self, capacity: impl Into<Option<usize>>, insert: Option<impl FnOnce() -> I> ) -> Result<BufferSlice<'_, B, N>, TryDequeueError>

Tries dequeuing a buffer with all enqueued values from the queue, and resizes the next buffer to be used for enqueuing.

This method is an extension of try_dequeue method. In fact, before swapping the buffers, next one is empty and protected, so it can be resized, and it is also possible to add values in it before making it available for enqueuing. This can be used to make the queue unbounded.

It is worth to be noted that only one buffer is resized, so it can lead to asymmetric buffers.

Examples
let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(1);
queue.try_enqueue([0]).unwrap();
// queue is full
assert_eq!(
    queue.try_enqueue([1]),
    Err(TryEnqueueError::InsufficientCapacity([1]))
);
// dequeue and resize, inserting elements before the buffer is available
{
    let slice = queue
        .try_dequeue_and_resize(3, Some(|| std::iter::once([42])))
        .unwrap();
    assert_eq!(slice.deref(), &[0]);
}
// capacity has been increased
queue.try_enqueue([1]).unwrap();
queue.try_enqueue([2]).unwrap();
let slice = queue.try_dequeue().unwrap();
assert_eq!(slice.deref(), &[42, 1, 2]);
An amortized unbounded recipe
fn enqueue_unbounded<T>(
    queue: &Queue<VecBuffer<T>>,
    overflow: &Mutex<Vec<[T; 1]>>,
    mut value: T,
) -> Result<(), EnqueueError<[T; 1]>> {
    // first, try to enqueue normally
    match queue.try_enqueue([value]) {
        Err(TryEnqueueError::InsufficientCapacity([v])) => value = v,
        res => return res,
    };
    // if the enqueuing fails, lock the overflow
    let mut guard = overflow.lock().unwrap();
    // retry to enqueue (we never know what happened during lock acquisition)
    match queue.try_enqueue([value]) {
        Err(TryEnqueueError::InsufficientCapacity([v])) => value = v,
        res => return res,
    };
    // then push the values to the overflow vector
    guard.push([value]);
    drop(guard);
    // notify possible waiting dequeue
    queue.notify().notify_dequeue();
    Ok(())
}

fn try_dequeue_unbounded<'a, T>(
    queue: &'a Queue<VecBuffer<T>>,
    overflow: &Mutex<Vec<[T; 1]>>,
) -> Result<BufferSlice<'a, VecBuffer<T>, ()>, TryDequeueError> {
    // lock the overflow and use `try_dequeue_and_resize` to drain the overflow into the
    // queue
    let mut guard = overflow.lock().unwrap();
    let vec = &mut guard;
    // `{ vec }` is a trick to get the correct FnOnce inference
    // https://stackoverflow.com/questions/74814588/why-does-rust-infer-fnmut-instead-of-fnonce-for-this-closure-even-though-inferr
    queue.try_dequeue_and_resize(queue.capacity() + vec.len(), Some(|| { vec }.drain(..)))
}

// queue is initialized with zero capacity
let queue: Queue<VecBuffer<usize>> = Queue::new();
let overflow = Mutex::new(Vec::new());
assert_eq!(queue.capacity(), 0);
enqueue_unbounded(&queue, &overflow, 0).unwrap();
assert_eq!(
    try_dequeue_unbounded(&queue, &overflow).unwrap().deref(),
    &[0]
);
enqueue_unbounded(&queue, &overflow, 1).unwrap();
enqueue_unbounded(&queue, &overflow, 2).unwrap();
assert_eq!(
    try_dequeue_unbounded(&queue, &overflow).unwrap().deref(),
    &[1, 2]
);
source§

impl<B> Queue<B, SynchronizedNotifier>
where B: Buffer,

source

pub fn enqueue<T>(&self, value: T) -> Result<(), EnqueueError<T>>
where T: InsertIntoBuffer<B>,

Available on crate feature std only.

Enqueues the given value inside the queue.

This method extends try_enqueue by waiting synchronously SynchronizedNotifier::notify_enqueue call, i.e. when a buffer is dequeued, in case of insufficient capacity.

Examples
let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
    Arc::new(SynchronizedQueue::with_capacity(1));
queue.try_enqueue([0]).unwrap();
assert_eq!(
    queue.try_enqueue([1]),
    Err(TryEnqueueError::InsufficientCapacity([1]))
);
// queue is full, let's spawn an enqueuing task and dequeue
let queue_clone = queue.clone();
let task = std::thread::spawn(move || queue_clone.enqueue([1]));
std::thread::sleep(Duration::from_millis(1));
assert_eq!(queue.try_dequeue().unwrap().deref(), &[0]);
// enqueuing task has succeeded
task.join().unwrap().unwrap();
assert_eq!(queue.try_dequeue().unwrap().deref(), &[1]);
// let's close the queue
queue.try_enqueue([2]).unwrap();
let queue_clone = queue.clone();
let task = std::thread::spawn(move || queue_clone.enqueue([3]));
std::thread::sleep(Duration::from_millis(1));
queue.close();
assert_eq!(task.join().unwrap(), Err(EnqueueError::Closed([3])));
source

pub fn enqueue_timeout<T>( &self, value: T, timeout: Duration ) -> Result<(), TryEnqueueError<T>>
where T: InsertIntoBuffer<B>,

Available on crate feature std only.

Tries enqueuing the given value inside the queue with a timeout.

This method extends try_enqueue by waiting synchronously (with a timeout) SynchronizedNotifier::notify_enqueue call, i.e. when a buffer is dequeued, in case of insufficient capacity.

Examples
let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
    Arc::new(SynchronizedQueue::with_capacity(1));
queue.try_enqueue([0]).unwrap();
assert_eq!(
    queue.enqueue_timeout([1], Duration::from_millis(1)),
    Err(TryEnqueueError::InsufficientCapacity([1]))
);
let queue_clone = queue.clone();
let task = std::thread::spawn(move || {
    std::thread::sleep(Duration::from_millis(1));
    queue_clone.try_dequeue().unwrap();
});
queue.enqueue_timeout([1], Duration::from_secs(1)).unwrap();
source

pub async fn enqueue_async<T>(&self, value: T) -> Result<(), EnqueueError<T>>
where T: InsertIntoBuffer<B>,

Available on crate feature std only.

Enqueues the given value inside the queue.

This method extends try_enqueue by waiting asynchronously SynchronizedNotifier::notify_enqueue call, i.e. when a buffer is dequeued, in case of insufficient capacity.

Examples
let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
    Arc::new(SynchronizedQueue::with_capacity(1));
queue.try_enqueue([0]).unwrap();
assert_eq!(
    queue.try_enqueue([0]),
    Err(TryEnqueueError::InsufficientCapacity([0]))
);
// queue is full, let's spawn an enqueuing task and dequeue
let queue_clone = queue.clone();
let task = tokio::spawn(async move { queue_clone.enqueue_async([1]).await });
assert_eq!(queue.try_dequeue().unwrap().deref(), &[0]);
// enqueuing task has succeeded
task.await.unwrap().unwrap();
assert_eq!(queue.try_dequeue().unwrap().deref(), &[1]);
// let's close the queue
queue.try_enqueue([2]).unwrap();
let queue_clone = queue.clone();
let task = tokio::spawn(async move { queue_clone.enqueue_async([3]).await });
queue.close();
assert_eq!(task.await.unwrap(), Err(EnqueueError::Closed([3])));
source

pub fn dequeue( &self ) -> Result<BufferSlice<'_, B, SynchronizedNotifier>, DequeueError>

Available on crate feature std only.

Dequeues a buffer with all enqueued values from the queue.

This method extends try_dequeue by waiting synchronously SynchronizedNotifier::notify_dequeue call, i.e. when a value is enqueued, in case of empty queue.

Examples
let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
    Arc::new(SynchronizedQueue::with_capacity(1));
assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Empty);
// queue is empty, let's spawn a dequeuing task and enqueue
let queue_clone = queue.clone();
let task = std::thread::spawn(move || {
    Ok::<_, DequeueError>(queue_clone.dequeue()?.into_iter().collect::<Vec<_>>())
});
queue.try_enqueue([0]).unwrap();
// dequeuing task has succeeded
assert_eq!(task.join().unwrap().unwrap().deref(), &[0]);
// let's close the queue
let queue_clone = queue.clone();
let task = std::thread::spawn(move || {
    Ok::<_, DequeueError>(queue_clone.dequeue()?.into_iter().collect::<Vec<_>>())
});
queue.close();
assert_eq!(task.join().unwrap().unwrap_err(), DequeueError::Closed);
source

pub fn dequeue_timeout( &self, timeout: Duration ) -> Result<BufferSlice<'_, B, SynchronizedNotifier>, TryDequeueError>

Available on crate feature std only.

Tries dequeuing a buffer with all enqueued values from the queue with a timeout.

This method extends try_dequeue by waiting synchronously, with a timeout, SynchronizedNotifier::notify_dequeue call, i.e. when a value is enqueued, in case of empty queue.

Examples
let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
    Arc::new(SynchronizedQueue::with_capacity(1));
assert_eq!(
    queue.dequeue_timeout(Duration::from_millis(1)).unwrap_err(),
    TryDequeueError::Empty
);
let queue_clone = queue.clone();
let task = std::thread::spawn(move || {
    std::thread::sleep(Duration::from_millis(1));
    queue_clone.try_enqueue([0]).unwrap();
});
assert_eq!(
    queue
        .dequeue_timeout(Duration::from_secs(1))
        .unwrap()
        .deref(),
    &[0]
);
source

pub async fn dequeue_async( &self ) -> Result<BufferSlice<'_, B, SynchronizedNotifier>, DequeueError>

Available on crate feature std only.

Dequeues a buffer with all enqueued values from the queue.

This method extends try_dequeue by waiting asynchronously SynchronizedNotifier::notify_dequeue call, i.e. when a value is enqueued, in case of empty queue.

Examples
let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
    Arc::new(SynchronizedQueue::with_capacity(1));
assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Empty);
// queue is empty, let's spawn a dequeuing task and enqueue
let queue_clone = queue.clone();
let task = tokio::spawn(async move {
    Ok::<_, DequeueError>(
        queue_clone
            .dequeue_async()
            .await?
            .into_iter()
            .collect::<Vec<_>>(),
    )
});
queue.try_enqueue([0]).unwrap();
// dequeuing task has succeeded
assert_eq!(task.await.unwrap().unwrap().deref(), &[0]);
// let's close the queue
let queue_clone = queue.clone();
let task = tokio::spawn(async move {
    Ok::<_, DequeueError>(
        queue_clone
            .dequeue_async()
            .await?
            .into_iter()
            .collect::<Vec<_>>(),
    )
});
queue.close();
assert_eq!(task.await.unwrap().unwrap_err(), DequeueError::Closed);
source§

impl<B> Queue<B, SynchronizedNotifier>
where B: Buffer + Drain,

source

pub fn iter(&self) -> impl Iterator<Item = B::Value> + '_

Available on crate feature std only.

Returns an iterator over the element of the queue (see BufferIter).

Examples
let queue: SynchronizedQueue<VecBuffer<usize>> = SynchronizedQueue::with_capacity(42);
queue.try_enqueue([0]).unwrap();
queue.try_enqueue([1]).unwrap();

let mut iter = queue.iter();
assert_eq!(iter.next(), Some(0));
drop(iter);
let mut iter = queue.iter();
assert_eq!(iter.next(), Some(1));
queue.close(); // close in order to stop the iterator
assert_eq!(iter.next(), None);
source

pub fn stream(&self) -> impl Stream<Item = B::Value> + '_

Available on crate features std and stream only.

Returns an stream over the element of the queue (see BufferIter).

Examples
let queue: SynchronizedQueue<VecBuffer<usize>> = SynchronizedQueue::with_capacity(42);
queue.try_enqueue([0]).unwrap();
queue.try_enqueue([1]).unwrap();

let mut stream = Box::pin(queue.stream());
assert_eq!(stream.next().await, Some(0));
drop(stream);
let mut stream = Box::pin(queue.stream());
assert_eq!(stream.next().await, Some(1));
queue.close(); // close in order to stop the stream
assert_eq!(stream.next().await, None);

Trait Implementations§

source§

impl<B, N> AsRef<Queue<B, N>> for Queue<B, N>
where B: Buffer,

source§

fn as_ref(&self) -> &Queue<B, N>

Converts this type into a shared reference of the (usually inferred) input type.
source§

impl<B, N> Debug for Queue<B, N>
where B: Buffer, N: Debug,

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl<B, N> Default for Queue<B, N>
where B: Buffer, N: Default,

source§

fn default() -> Self

Returns the “default value” for a type. Read more
source§

impl<B, N> Drop for Queue<B, N>
where B: Buffer,

source§

fn drop(&mut self)

Executes the destructor for this type. Read more
source§

impl<B, N> Send for Queue<B, N>
where B: Buffer + Send, N: Send,

source§

impl<B, N> Sync for Queue<B, N>
where B: Buffer + Send, N: Sync,

Auto Trait Implementations§

§

impl<B, N = ()> !RefUnwindSafe for Queue<B, N>

§

impl<B, N> Unpin for Queue<B, N>
where B: Unpin, N: Unpin,

§

impl<B, N> UnwindSafe for Queue<B, N>
where B: UnwindSafe, N: UnwindSafe,

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.