Struct swap_buffer_queue::Queue
source · pub struct Queue<B, N = ()>where
B: Buffer,{ /* private fields */ }
Expand description
A buffered MPSC “swap-buffer” queue.
Implementations§
source§impl<B, N> Queue<B, N>
impl<B, N> Queue<B, N>
sourcepub fn with_capacity(capacity: usize) -> Self
pub fn with_capacity(capacity: usize) -> Self
Creates a new queue with the given capacity.
Examples
let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
source§impl<B, N> Queue<B, N>where
B: Buffer,
impl<B, N> Queue<B, N>where
B: Buffer,
sourcepub fn capacity(&self) -> usize
pub fn capacity(&self) -> usize
Returns the current enqueuing buffer capacity.
Examples
let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
assert_eq!(queue.capacity(), 42);
sourcepub fn len(&self) -> usize
pub fn len(&self) -> usize
Returns the current enqueuing buffer length.
Examples
let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
assert_eq!(queue.len(), 0);
queue.try_enqueue([0]).unwrap();
assert_eq!(queue.len(), 1);
sourcepub fn is_empty(&self) -> bool
pub fn is_empty(&self) -> bool
Returns true
if the current enqueuing buffer is empty.
Examples
let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
assert!(queue.is_empty());
source§impl<B, N> Queue<B, N>
impl<B, N> Queue<B, N>
sourcepub fn try_enqueue<T>(&self, value: T) -> Result<(), TryEnqueueError<T>>where
T: InsertIntoBuffer<B>,
pub fn try_enqueue<T>(&self, value: T) -> Result<(), TryEnqueueError<T>>where
T: InsertIntoBuffer<B>,
Tries enqueuing the given value into the queue.
Enqueuing will fail if the queue has insufficient capacity, or if it is closed. In case of
success, it will notify waiting dequeuing operations using Notify::notify_dequeue
.
Enqueuing a zero-sized value is a no-op.
Examples
let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(1);
queue.try_enqueue([0]).unwrap();
// queue is full
assert_eq!(
queue.try_enqueue([0]),
Err(TryEnqueueError::InsufficientCapacity([0]))
);
// let's close the queue
queue.close();
assert_eq!(queue.try_enqueue([0]), Err(TryEnqueueError::Closed([0])));
sourcepub fn try_dequeue(&self) -> Result<BufferSlice<'_, B, N>, TryDequeueError>
pub fn try_dequeue(&self) -> Result<BufferSlice<'_, B, N>, TryDequeueError>
Tries dequeuing a buffer with all enqueued values from the queue.
This method swaps the current buffer with the other one, which is empty. All concurrent
enqueuing must end before the the current buffer is really dequeuable, so the queue may
be in a transitory state where try_dequeue
must be retried. In this state, after a spin
loop, this method will return a TryDequeueError::Pending
error.
Dequeuing also fails if the queue is empty, or if it is closed. Moreover, as the algorithm
is MPSC, dequeuing is protected against concurrent calls, failing with
TryDequeueError::Conflict
error.
It returns a BufferSlice
, which holds, as its name may indicate, a reference to the
dequeued buffer. That’s why, the concurrent dequeuing protection is maintained for the
lifetime of the buffer slice.
Examples
let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
queue.try_enqueue([0]).unwrap();
queue.try_enqueue([1]).unwrap();
{
let slice = queue.try_dequeue().unwrap();
assert_eq!(slice.deref(), &[0, 1]);
// dequeuing cannot be done concurrently (`slice` is still in scope)
assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Conflict);
}
// let's close the queue
queue.try_enqueue([2]).unwrap();
queue.close();
// queue can be dequeued while closed when not empty
{
let slice = queue.try_dequeue().unwrap();
assert_eq!(slice.deref(), &[2]);
}
assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Closed)
sourcepub fn close(&self)
pub fn close(&self)
Closes the queue.
Closed queue can no more accept enqueuing, but it can be dequeued while not empty.
Calling this method on a closed queue has no effect.
See reopen
to reopen a closed queue.
Examples
let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
queue.try_enqueue([0]).unwrap();
queue.close();
assert!(queue.is_closed());
assert_eq!(queue.try_enqueue([1]), Err(TryEnqueueError::Closed([1])));
assert_eq!(queue.try_dequeue().unwrap().deref(), &[0]);
assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Closed);
source§impl<B, N> Queue<B, N>
impl<B, N> Queue<B, N>
sourcepub fn try_dequeue_and_resize<I>(
&self,
capacity: impl Into<Option<usize>>,
insert: Option<impl FnOnce() -> I>
) -> Result<BufferSlice<'_, B, N>, TryDequeueError>
pub fn try_dequeue_and_resize<I>( &self, capacity: impl Into<Option<usize>>, insert: Option<impl FnOnce() -> I> ) -> Result<BufferSlice<'_, B, N>, TryDequeueError>
Tries dequeuing a buffer with all enqueued values from the queue, and resizes the next buffer to be used for enqueuing.
This method is an extension of try_dequeue
method. In fact,
before swapping the buffers, next one is empty and protected, so it can be resized, and
it is also possible to add values in it before making it available for enqueuing.
This can be used to make the queue unbounded.
It is worth to be noted that only one buffer is resized, so it can lead to asymmetric buffers.
Examples
let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(1);
queue.try_enqueue([0]).unwrap();
// queue is full
assert_eq!(
queue.try_enqueue([1]),
Err(TryEnqueueError::InsufficientCapacity([1]))
);
// dequeue and resize, inserting elements before the buffer is available
{
let slice = queue
.try_dequeue_and_resize(3, Some(|| std::iter::once([42])))
.unwrap();
assert_eq!(slice.deref(), &[0]);
}
// capacity has been increased
queue.try_enqueue([1]).unwrap();
queue.try_enqueue([2]).unwrap();
let slice = queue.try_dequeue().unwrap();
assert_eq!(slice.deref(), &[42, 1, 2]);
An amortized unbounded recipe
fn enqueue_unbounded<T>(
queue: &Queue<VecBuffer<T>>,
overflow: &Mutex<Vec<[T; 1]>>,
mut value: T,
) -> Result<(), EnqueueError<[T; 1]>> {
// first, try to enqueue normally
match queue.try_enqueue([value]) {
Err(TryEnqueueError::InsufficientCapacity([v])) => value = v,
res => return res,
};
// if the enqueuing fails, lock the overflow
let mut guard = overflow.lock().unwrap();
// retry to enqueue (we never know what happened during lock acquisition)
match queue.try_enqueue([value]) {
Err(TryEnqueueError::InsufficientCapacity([v])) => value = v,
res => return res,
};
// then push the values to the overflow vector
guard.push([value]);
drop(guard);
// notify possible waiting dequeue
queue.notify().notify_dequeue();
Ok(())
}
fn try_dequeue_unbounded<'a, T>(
queue: &'a Queue<VecBuffer<T>>,
overflow: &Mutex<Vec<[T; 1]>>,
) -> Result<BufferSlice<'a, VecBuffer<T>, ()>, TryDequeueError> {
// lock the overflow and use `try_dequeue_and_resize` to drain the overflow into the
// queue
let mut guard = overflow.lock().unwrap();
let vec = &mut guard;
// `{ vec }` is a trick to get the correct FnOnce inference
// https://stackoverflow.com/questions/74814588/why-does-rust-infer-fnmut-instead-of-fnonce-for-this-closure-even-though-inferr
queue.try_dequeue_and_resize(queue.capacity() + vec.len(), Some(|| { vec }.drain(..)))
}
// queue is initialized with zero capacity
let queue: Queue<VecBuffer<usize>> = Queue::new();
let overflow = Mutex::new(Vec::new());
assert_eq!(queue.capacity(), 0);
enqueue_unbounded(&queue, &overflow, 0).unwrap();
assert_eq!(
try_dequeue_unbounded(&queue, &overflow).unwrap().deref(),
&[0]
);
enqueue_unbounded(&queue, &overflow, 1).unwrap();
enqueue_unbounded(&queue, &overflow, 2).unwrap();
assert_eq!(
try_dequeue_unbounded(&queue, &overflow).unwrap().deref(),
&[1, 2]
);
source§impl<B> Queue<B, SynchronizedNotifier>where
B: Buffer,
impl<B> Queue<B, SynchronizedNotifier>where
B: Buffer,
sourcepub fn enqueue<T>(&self, value: T) -> Result<(), EnqueueError<T>>where
T: InsertIntoBuffer<B>,
Available on crate feature std
only.
pub fn enqueue<T>(&self, value: T) -> Result<(), EnqueueError<T>>where
T: InsertIntoBuffer<B>,
std
only.Enqueues the given value inside the queue.
This method extends try_enqueue
by waiting synchronously
SynchronizedNotifier::notify_enqueue
call, i.e. when a buffer is dequeued, in case of
insufficient capacity.
Examples
let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
Arc::new(SynchronizedQueue::with_capacity(1));
queue.try_enqueue([0]).unwrap();
assert_eq!(
queue.try_enqueue([1]),
Err(TryEnqueueError::InsufficientCapacity([1]))
);
// queue is full, let's spawn an enqueuing task and dequeue
let queue_clone = queue.clone();
let task = std::thread::spawn(move || queue_clone.enqueue([1]));
std::thread::sleep(Duration::from_millis(1));
assert_eq!(queue.try_dequeue().unwrap().deref(), &[0]);
// enqueuing task has succeeded
task.join().unwrap().unwrap();
assert_eq!(queue.try_dequeue().unwrap().deref(), &[1]);
// let's close the queue
queue.try_enqueue([2]).unwrap();
let queue_clone = queue.clone();
let task = std::thread::spawn(move || queue_clone.enqueue([3]));
std::thread::sleep(Duration::from_millis(1));
queue.close();
assert_eq!(task.join().unwrap(), Err(EnqueueError::Closed([3])));
sourcepub fn enqueue_timeout<T>(
&self,
value: T,
timeout: Duration
) -> Result<(), TryEnqueueError<T>>where
T: InsertIntoBuffer<B>,
Available on crate feature std
only.
pub fn enqueue_timeout<T>(
&self,
value: T,
timeout: Duration
) -> Result<(), TryEnqueueError<T>>where
T: InsertIntoBuffer<B>,
std
only.Tries enqueuing the given value inside the queue with a timeout.
This method extends try_enqueue
by waiting synchronously (with a
timeout) SynchronizedNotifier::notify_enqueue
call, i.e. when a buffer is dequeued, in case of
insufficient capacity.
Examples
let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
Arc::new(SynchronizedQueue::with_capacity(1));
queue.try_enqueue([0]).unwrap();
assert_eq!(
queue.enqueue_timeout([1], Duration::from_millis(1)),
Err(TryEnqueueError::InsufficientCapacity([1]))
);
let queue_clone = queue.clone();
let task = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(1));
queue_clone.try_dequeue().unwrap();
});
queue.enqueue_timeout([1], Duration::from_secs(1)).unwrap();
sourcepub async fn enqueue_async<T>(&self, value: T) -> Result<(), EnqueueError<T>>where
T: InsertIntoBuffer<B>,
Available on crate feature std
only.
pub async fn enqueue_async<T>(&self, value: T) -> Result<(), EnqueueError<T>>where
T: InsertIntoBuffer<B>,
std
only.Enqueues the given value inside the queue.
This method extends try_enqueue
by waiting asynchronously
SynchronizedNotifier::notify_enqueue
call, i.e. when a buffer is dequeued, in case of
insufficient capacity.
Examples
let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
Arc::new(SynchronizedQueue::with_capacity(1));
queue.try_enqueue([0]).unwrap();
assert_eq!(
queue.try_enqueue([0]),
Err(TryEnqueueError::InsufficientCapacity([0]))
);
// queue is full, let's spawn an enqueuing task and dequeue
let queue_clone = queue.clone();
let task = tokio::spawn(async move { queue_clone.enqueue_async([1]).await });
assert_eq!(queue.try_dequeue().unwrap().deref(), &[0]);
// enqueuing task has succeeded
task.await.unwrap().unwrap();
assert_eq!(queue.try_dequeue().unwrap().deref(), &[1]);
// let's close the queue
queue.try_enqueue([2]).unwrap();
let queue_clone = queue.clone();
let task = tokio::spawn(async move { queue_clone.enqueue_async([3]).await });
queue.close();
assert_eq!(task.await.unwrap(), Err(EnqueueError::Closed([3])));
sourcepub fn dequeue(
&self
) -> Result<BufferSlice<'_, B, SynchronizedNotifier>, DequeueError>
Available on crate feature std
only.
pub fn dequeue( &self ) -> Result<BufferSlice<'_, B, SynchronizedNotifier>, DequeueError>
std
only.Dequeues a buffer with all enqueued values from the queue.
This method extends try_dequeue
by waiting synchronously
SynchronizedNotifier::notify_dequeue
call, i.e. when a value is enqueued, in case of
empty queue.
Examples
let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
Arc::new(SynchronizedQueue::with_capacity(1));
assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Empty);
// queue is empty, let's spawn a dequeuing task and enqueue
let queue_clone = queue.clone();
let task = std::thread::spawn(move || {
Ok::<_, DequeueError>(queue_clone.dequeue()?.into_iter().collect::<Vec<_>>())
});
queue.try_enqueue([0]).unwrap();
// dequeuing task has succeeded
assert_eq!(task.join().unwrap().unwrap().deref(), &[0]);
// let's close the queue
let queue_clone = queue.clone();
let task = std::thread::spawn(move || {
Ok::<_, DequeueError>(queue_clone.dequeue()?.into_iter().collect::<Vec<_>>())
});
queue.close();
assert_eq!(task.join().unwrap().unwrap_err(), DequeueError::Closed);
sourcepub fn dequeue_timeout(
&self,
timeout: Duration
) -> Result<BufferSlice<'_, B, SynchronizedNotifier>, TryDequeueError>
Available on crate feature std
only.
pub fn dequeue_timeout( &self, timeout: Duration ) -> Result<BufferSlice<'_, B, SynchronizedNotifier>, TryDequeueError>
std
only.Tries dequeuing a buffer with all enqueued values from the queue with a timeout.
This method extends try_dequeue
by waiting synchronously, with a
timeout, SynchronizedNotifier::notify_dequeue
call, i.e. when a value is enqueued, in case of
empty queue.
Examples
let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
Arc::new(SynchronizedQueue::with_capacity(1));
assert_eq!(
queue.dequeue_timeout(Duration::from_millis(1)).unwrap_err(),
TryDequeueError::Empty
);
let queue_clone = queue.clone();
let task = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(1));
queue_clone.try_enqueue([0]).unwrap();
});
assert_eq!(
queue
.dequeue_timeout(Duration::from_secs(1))
.unwrap()
.deref(),
&[0]
);
sourcepub async fn dequeue_async(
&self
) -> Result<BufferSlice<'_, B, SynchronizedNotifier>, DequeueError>
Available on crate feature std
only.
pub async fn dequeue_async( &self ) -> Result<BufferSlice<'_, B, SynchronizedNotifier>, DequeueError>
std
only.Dequeues a buffer with all enqueued values from the queue.
This method extends try_dequeue
by waiting asynchronously
SynchronizedNotifier::notify_dequeue
call, i.e. when a value is enqueued, in case of
empty queue.
Examples
let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
Arc::new(SynchronizedQueue::with_capacity(1));
assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Empty);
// queue is empty, let's spawn a dequeuing task and enqueue
let queue_clone = queue.clone();
let task = tokio::spawn(async move {
Ok::<_, DequeueError>(
queue_clone
.dequeue_async()
.await?
.into_iter()
.collect::<Vec<_>>(),
)
});
queue.try_enqueue([0]).unwrap();
// dequeuing task has succeeded
assert_eq!(task.await.unwrap().unwrap().deref(), &[0]);
// let's close the queue
let queue_clone = queue.clone();
let task = tokio::spawn(async move {
Ok::<_, DequeueError>(
queue_clone
.dequeue_async()
.await?
.into_iter()
.collect::<Vec<_>>(),
)
});
queue.close();
assert_eq!(task.await.unwrap().unwrap_err(), DequeueError::Closed);
source§impl<B> Queue<B, SynchronizedNotifier>
impl<B> Queue<B, SynchronizedNotifier>
sourcepub fn iter(&self) -> impl Iterator<Item = B::Value> + '_
Available on crate feature std
only.
pub fn iter(&self) -> impl Iterator<Item = B::Value> + '_
std
only.Returns an iterator over the element of the queue (see BufferIter
).
Examples
let queue: SynchronizedQueue<VecBuffer<usize>> = SynchronizedQueue::with_capacity(42);
queue.try_enqueue([0]).unwrap();
queue.try_enqueue([1]).unwrap();
let mut iter = queue.iter();
assert_eq!(iter.next(), Some(0));
drop(iter);
let mut iter = queue.iter();
assert_eq!(iter.next(), Some(1));
queue.close(); // close in order to stop the iterator
assert_eq!(iter.next(), None);
sourcepub fn stream(&self) -> impl Stream<Item = B::Value> + '_
Available on crate features std
and stream
only.
pub fn stream(&self) -> impl Stream<Item = B::Value> + '_
std
and stream
only.Returns an stream over the element of the queue (see BufferIter
).
Examples
let queue: SynchronizedQueue<VecBuffer<usize>> = SynchronizedQueue::with_capacity(42);
queue.try_enqueue([0]).unwrap();
queue.try_enqueue([1]).unwrap();
let mut stream = Box::pin(queue.stream());
assert_eq!(stream.next().await, Some(0));
drop(stream);
let mut stream = Box::pin(queue.stream());
assert_eq!(stream.next().await, Some(1));
queue.close(); // close in order to stop the stream
assert_eq!(stream.next().await, None);