swap-buffer-queue
A buffering MPSC queue.
This library is intended to be a (better, I hope) alternative to traditional MPSC queues in the context of a buffering consumer, by moving the buffering part directly into the queue.
It is especially well suited for IO writing workflow, see buffer implementations.
The crate is no_std (some buffer implementations may require std
).
Example
use Deref;
use ;
// Initialize the queue with a capacity
let queue: = with_capacity;
// Enqueue some values
queue.try_enqueue.unwrap;
queue.try_enqueue.unwrap;
// Dequeue a slice to the enqueued values
let slice = queue.try_dequeue.unwrap;
assert_eq!;
// Enqueued values can also be retrieved
assert_eq!;
Buffer implementations
In addition to simple ArrayBuffer
and VecBuffer
, this crate provides useful write-oriented implementations.
write
WriteArrayBuffer
and
WriteVecBuffer
are well suited when there are objects to be serialized with a known-serialization size. Indeed, objects can then be serialized directly on the queue's buffer, avoiding allocation.
use Queue;
use ;
// the slice to be written in the queue's buffer (not too complex for the example)
;
//!
// Creates a WriteVecBuffer queue with a 2-bytes header
let queue: = with_capacity;
queue.try_enqueue.unwrap;
queue.try_enqueue.unwrap;
let mut slice = queue.try_dequeue.unwrap;
// Adds a header with the len of the buffer
let len = .to_be_bytes;
slice.header.copy_from_slice;
// Let's pretend we have a writer
let mut writer: = Default default;
assert_eq!;
write_vectored
WriteVectoredArrayBuffer
and
WriteVectoredVecBuffer
allows buffering a slice of IoSlice
, saving the cost of dequeuing io-slices one by one to collect them after.
(Internally, two buffers are used, one of the values, and one for the io-slices)
As a convenience, total size of the buffered io-slices can be retrieved.
use ;
use ;
use Queue;
// Creates a WriteVectoredVecBuffer queue
let queue: = with_capacity;
queue.try_enqueue.unwrap;
queue.try_enqueue.unwrap;
let mut slice = queue.try_dequeue.unwrap;
// Adds a header with the total size of the slices
let total_size = .to_be_bytes;
let mut frame = slice.frame;
// Let's pretend we have a writer
let mut writer: = Default default;
assert_eq!;
How it works
Internally, this queue use 2 buffers: one being used for enqueuing while the other is dequeued.
When Queue::try_enqueue
is called, it reserves atomically a slot in the current enqueuing buffer. The value is then inserted in the slot.
When Queue::try_dequeue
is called, both buffers are swapped atomically, so dequeued buffer will contain previously enqueued values, and new enqueued ones will go to the other (empty) buffer.
As the two-phase enqueuing cannot be atomic, the queue can be in a transitory state, where slots have been reserved but have not been written yet. This issue is mitigated using a spin loop in dequeuing method. If the spin loop fails, the transitory state is saved and spin loop will be retried at the next dequeue.
Performance
swap-buffer-queue is very performant – it's actually the fastest MPSC queue I know.
Here is the crossbeam benchmark forked
benchmark | crossbeam | swap-buffer-queue |
---|---|---|
bounded1_mpsc | 1.545s | 1.341s |
bounded1_spsc | 1.652s | 1.138s |
bounded_mpsc | 0.362s | 0.178s |
bounded_seq | 0.190s | 0.156s |
bounded_spsc | 0.115s | 0.139s |
However, a large enough capacity is required to reach maximum performance; otherwise, high contention scenario may be penalized. This is because the algorithm put all the contention on a single atomic integer (instead of two for crossbeam).