Type Alias swap_buffer_queue::SynchronizedQueue
source · pub type SynchronizedQueue<B> = Queue<B, SynchronizedNotifier>;
std
only.Expand description
Aliased Type§
struct SynchronizedQueue<B> { /* private fields */ }
Implementations§
source§impl<B> SynchronizedQueue<B>where
B: Buffer,
impl<B> SynchronizedQueue<B>where
B: Buffer,
sourcepub fn enqueue<T>(&self, value: T) -> Result<(), EnqueueError<T>>where
T: InsertIntoBuffer<B>,
pub fn enqueue<T>(&self, value: T) -> Result<(), EnqueueError<T>>where
T: InsertIntoBuffer<B>,
Enqueues the given value inside the queue.
This method extends try_enqueue
by waiting synchronously
SynchronizedNotifier::notify_enqueue
call, i.e. when a buffer is dequeued, in case of
insufficient capacity.
Examples
let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
Arc::new(SynchronizedQueue::with_capacity(1));
queue.try_enqueue([0]).unwrap();
assert_eq!(
queue.try_enqueue([1]),
Err(TryEnqueueError::InsufficientCapacity([1]))
);
// queue is full, let's spawn an enqueuing task and dequeue
let queue_clone = queue.clone();
let task = std::thread::spawn(move || queue_clone.enqueue([1]));
std::thread::sleep(Duration::from_millis(1));
assert_eq!(queue.try_dequeue().unwrap().deref(), &[0]);
// enqueuing task has succeeded
task.join().unwrap().unwrap();
assert_eq!(queue.try_dequeue().unwrap().deref(), &[1]);
// let's close the queue
queue.try_enqueue([2]).unwrap();
let queue_clone = queue.clone();
let task = std::thread::spawn(move || queue_clone.enqueue([3]));
std::thread::sleep(Duration::from_millis(1));
queue.close();
assert_eq!(task.join().unwrap(), Err(EnqueueError::Closed([3])));
sourcepub fn enqueue_timeout<T>(
&self,
value: T,
timeout: Duration
) -> Result<(), TryEnqueueError<T>>where
T: InsertIntoBuffer<B>,
pub fn enqueue_timeout<T>(
&self,
value: T,
timeout: Duration
) -> Result<(), TryEnqueueError<T>>where
T: InsertIntoBuffer<B>,
Tries enqueuing the given value inside the queue with a timeout.
This method extends try_enqueue
by waiting synchronously (with a
timeout) SynchronizedNotifier::notify_enqueue
call, i.e. when a buffer is dequeued, in case of
insufficient capacity.
Examples
let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
Arc::new(SynchronizedQueue::with_capacity(1));
queue.try_enqueue([0]).unwrap();
assert_eq!(
queue.enqueue_timeout([1], Duration::from_millis(1)),
Err(TryEnqueueError::InsufficientCapacity([1]))
);
let queue_clone = queue.clone();
let task = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(1));
queue_clone.try_dequeue().unwrap();
});
queue.enqueue_timeout([1], Duration::from_secs(1)).unwrap();
sourcepub async fn enqueue_async<T>(&self, value: T) -> Result<(), EnqueueError<T>>where
T: InsertIntoBuffer<B>,
pub async fn enqueue_async<T>(&self, value: T) -> Result<(), EnqueueError<T>>where
T: InsertIntoBuffer<B>,
Enqueues the given value inside the queue.
This method extends try_enqueue
by waiting asynchronously
SynchronizedNotifier::notify_enqueue
call, i.e. when a buffer is dequeued, in case of
insufficient capacity.
Examples
let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
Arc::new(SynchronizedQueue::with_capacity(1));
queue.try_enqueue([0]).unwrap();
assert_eq!(
queue.try_enqueue([0]),
Err(TryEnqueueError::InsufficientCapacity([0]))
);
// queue is full, let's spawn an enqueuing task and dequeue
let queue_clone = queue.clone();
let task = tokio::spawn(async move { queue_clone.enqueue_async([1]).await });
assert_eq!(queue.try_dequeue().unwrap().deref(), &[0]);
// enqueuing task has succeeded
task.await.unwrap().unwrap();
assert_eq!(queue.try_dequeue().unwrap().deref(), &[1]);
// let's close the queue
queue.try_enqueue([2]).unwrap();
let queue_clone = queue.clone();
let task = tokio::spawn(async move { queue_clone.enqueue_async([3]).await });
queue.close();
assert_eq!(task.await.unwrap(), Err(EnqueueError::Closed([3])));
sourcepub fn dequeue(
&self
) -> Result<BufferSlice<'_, B, SynchronizedNotifier>, DequeueError>
pub fn dequeue( &self ) -> Result<BufferSlice<'_, B, SynchronizedNotifier>, DequeueError>
Dequeues a buffer with all enqueued values from the queue.
This method extends try_dequeue
by waiting synchronously
SynchronizedNotifier::notify_dequeue
call, i.e. when a value is enqueued, in case of
empty queue.
Examples
let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
Arc::new(SynchronizedQueue::with_capacity(1));
assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Empty);
// queue is empty, let's spawn a dequeuing task and enqueue
let queue_clone = queue.clone();
let task = std::thread::spawn(move || {
Ok::<_, DequeueError>(queue_clone.dequeue()?.into_iter().collect::<Vec<_>>())
});
queue.try_enqueue([0]).unwrap();
// dequeuing task has succeeded
assert_eq!(task.join().unwrap().unwrap().deref(), &[0]);
// let's close the queue
let queue_clone = queue.clone();
let task = std::thread::spawn(move || {
Ok::<_, DequeueError>(queue_clone.dequeue()?.into_iter().collect::<Vec<_>>())
});
queue.close();
assert_eq!(task.join().unwrap().unwrap_err(), DequeueError::Closed);
sourcepub fn dequeue_timeout(
&self,
timeout: Duration
) -> Result<BufferSlice<'_, B, SynchronizedNotifier>, TryDequeueError>
pub fn dequeue_timeout( &self, timeout: Duration ) -> Result<BufferSlice<'_, B, SynchronizedNotifier>, TryDequeueError>
Tries dequeuing a buffer with all enqueued values from the queue with a timeout.
This method extends try_dequeue
by waiting synchronously, with a
timeout, SynchronizedNotifier::notify_dequeue
call, i.e. when a value is enqueued, in case of
empty queue.
Examples
let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
Arc::new(SynchronizedQueue::with_capacity(1));
assert_eq!(
queue.dequeue_timeout(Duration::from_millis(1)).unwrap_err(),
TryDequeueError::Empty
);
let queue_clone = queue.clone();
let task = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(1));
queue_clone.try_enqueue([0]).unwrap();
});
assert_eq!(
queue
.dequeue_timeout(Duration::from_secs(1))
.unwrap()
.deref(),
&[0]
);
sourcepub async fn dequeue_async(
&self
) -> Result<BufferSlice<'_, B, SynchronizedNotifier>, DequeueError>
pub async fn dequeue_async( &self ) -> Result<BufferSlice<'_, B, SynchronizedNotifier>, DequeueError>
Dequeues a buffer with all enqueued values from the queue.
This method extends try_dequeue
by waiting asynchronously
SynchronizedNotifier::notify_dequeue
call, i.e. when a value is enqueued, in case of
empty queue.
Examples
let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
Arc::new(SynchronizedQueue::with_capacity(1));
assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Empty);
// queue is empty, let's spawn a dequeuing task and enqueue
let queue_clone = queue.clone();
let task = tokio::spawn(async move {
Ok::<_, DequeueError>(
queue_clone
.dequeue_async()
.await?
.into_iter()
.collect::<Vec<_>>(),
)
});
queue.try_enqueue([0]).unwrap();
// dequeuing task has succeeded
assert_eq!(task.await.unwrap().unwrap().deref(), &[0]);
// let's close the queue
let queue_clone = queue.clone();
let task = tokio::spawn(async move {
Ok::<_, DequeueError>(
queue_clone
.dequeue_async()
.await?
.into_iter()
.collect::<Vec<_>>(),
)
});
queue.close();
assert_eq!(task.await.unwrap().unwrap_err(), DequeueError::Closed);
source§impl<B> SynchronizedQueue<B>
impl<B> SynchronizedQueue<B>
sourcepub fn iter(&self) -> impl Iterator<Item = B::Value> + '_
pub fn iter(&self) -> impl Iterator<Item = B::Value> + '_
Returns an iterator over the element of the queue (see BufferIter
).
Examples
let queue: SynchronizedQueue<VecBuffer<usize>> = SynchronizedQueue::with_capacity(42);
queue.try_enqueue([0]).unwrap();
queue.try_enqueue([1]).unwrap();
let mut iter = queue.iter();
assert_eq!(iter.next(), Some(0));
drop(iter);
let mut iter = queue.iter();
assert_eq!(iter.next(), Some(1));
queue.close(); // close in order to stop the iterator
assert_eq!(iter.next(), None);
sourcepub fn stream(&self) -> impl Stream<Item = B::Value> + '_
Available on crate feature stream
only.
pub fn stream(&self) -> impl Stream<Item = B::Value> + '_
stream
only.Returns an stream over the element of the queue (see BufferIter
).
Examples
let queue: SynchronizedQueue<VecBuffer<usize>> = SynchronizedQueue::with_capacity(42);
queue.try_enqueue([0]).unwrap();
queue.try_enqueue([1]).unwrap();
let mut stream = Box::pin(queue.stream());
assert_eq!(stream.next().await, Some(0));
drop(stream);
let mut stream = Box::pin(queue.stream());
assert_eq!(stream.next().await, Some(1));
queue.close(); // close in order to stop the stream
assert_eq!(stream.next().await, None);