Crate swap_queue[−][src]
Expand description
A lock-free thread-owned queue whereby tasks are taken by stealers in entirety via buffer swapping. This is meant to be used thread_local
paired with tokio::task::spawn
as a take-all batching mechanism that outperforms [crossbeam::deque::Worker
], and tokio::sync::mpsc
for batching.
Example
use swap_queue::Worker;
use tokio::{
runtime::Handle,
sync::oneshot::{channel, Sender},
};
// Jemalloc makes this library substantially faster
#[global_allocator]
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;
// Worker needs to be thread local because it is !Sync
thread_local! {
static QUEUE: Worker<(u64, Sender<u64>)> = Worker::new();
}
// This mechanism will batch optimally without overhead within an async-context because take will poll after everything else scheduled
async fn push_echo(i: u64) -> u64 {
{
let (tx, rx) = channel();
QUEUE.with(|queue| {
// A new stealer is issued for every buffer swap
if let Some(stealer) = queue.push((i, tx)) {
Handle::current().spawn(async move {
// Take the underlying buffer in entirety.
let batch = stealer.take().await;
// Some sort of batched operation, such as a database load
batch.into_iter().for_each(|(i, tx)| {
tx.send(i).ok();
});
});
}
});
rx
}
.await
.unwrap()
}