swap-queue 1.0.0

A lock-free thread-owned queue whereby tasks are taken by stealers in entirety via buffer swapping
Documentation

Swap Queue

License Cargo Documentation CI

A lock-free thread-owned queue whereby tasks are taken by stealers in entirety via buffer swapping. This is meant to be used [thread_local] paired with [tokio::task::spawn] as a highly-performant take-all batching mechanism and is around ~11-19% faster than [crossbeam::deque::Worker], and ~28-45% faster than [tokio::sync::mpsc] on ARM.

Example

use swap_queue::Worker;
use tokio::{
  runtime::Handle,
  sync::oneshot::{channel, Sender},
};

// Jemalloc makes this library substantially faster
#[global_allocator]
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;

// Worker needs to be thread local because it is !Sync
thread_local! {
  static QUEUE: Worker<(u64, Sender<u64>)> = Worker::new();
}

// This mechanism will batch optimally without overhead within an async-context because take will poll after everything else scheduled
async fn push_echo(i: u64) -> u64 {
  {
    let (tx, rx) = channel();

    QUEUE.with(|queue| {
      // A new stealer is issued for every buffer swap
      if let Some(stealer) = queue.push((i, tx)) {
        Handle::current().spawn(async move {
          // Take the underlying buffer in entirety.
          let batch = stealer.take().await;

          // Some sort of batched operation, such as a database load

          batch.into_iter().for_each(|(i, tx)| {
            tx.send(i).ok();
          });
        });
      }
    });

    rx
  }
  .await
  .unwrap()
}

Benchmarks

Benchmarks ran on t4g.medium running Amazon Linux 2 AMI (HVM)

CI tested under ThreadSanitizer, LeakSanitizer, Miri and Loom.