gil 0.9.0

A collection of high-performance, lock-free concurrent queues (SPSC, MPSC, MPMC, SPMC) with sync and async support
Documentation
use core::ptr::NonNull;
#[cfg(feature = "async")]
use core::task::Waker;

#[cfg(feature = "async")]
use futures::task::AtomicWaker;

use crate::{
    atomic::{AtomicUsize, Ordering},
    padded::Padded,
    queue::{Ownership, RefCounted, ShardOwnership},
};

#[derive(Default)]
#[repr(C)]
pub struct Head {
    head: Padded<AtomicUsize>,
    #[cfg(feature = "async")]
    receiver_waker: Padded<AtomicWaker>,
}

#[derive(Default)]
#[repr(C)]
pub struct Tail {
    tail: Padded<AtomicUsize>,
    #[cfg(feature = "async")]
    sender_waker: Padded<AtomicWaker>,
}

pub struct GetInit;

impl<T> crate::DropInitItems<Head, Tail, T> for GetInit {
    unsafe fn drop_init_items(
        head: NonNull<Head>,
        tail: NonNull<Tail>,
        _capaity: usize,
        at: impl Fn(usize) -> NonNull<T>,
    ) {
        if !core::mem::needs_drop::<T>() {
            return;
        }

        let (head, tail) = unsafe {
            let head = _field!(Head, head, head.value, AtomicUsize)
                .as_ref()
                .load(Ordering::Relaxed);
            let tail = _field!(Tail, tail, tail.value, AtomicUsize)
                .as_ref()
                .load(Ordering::Relaxed);
            (head, tail)
        };
        let len = tail.wrapping_sub(head);

        for i in 0..len {
            let idx = head.wrapping_add(i);
            unsafe { at(idx).drop_in_place() };
        }
    }
}

pub(crate) type QueuePtr<T, O = RefCounted> = crate::QueuePtr<Head, Tail, T, GetInit, O>;
pub(crate) type ShardQueuePtr<T> = QueuePtr<T, ShardOwnership>;
type Queue<O = RefCounted> = crate::Queue<Head, Tail, O>;

impl<T, O: Ownership> QueuePtr<T, O> {
    #[inline(always)]
    pub fn head(&self) -> &AtomicUsize {
        unsafe { _field!(Queue<O>, self.ptr, head.head.value, AtomicUsize).as_ref() }
    }

    #[inline(always)]
    pub fn tail(&self) -> &AtomicUsize {
        unsafe { _field!(Queue<O>, self.ptr, tail.tail.value, AtomicUsize).as_ref() }
    }
}

impl<T> ShardQueuePtr<T> {
    pub(crate) fn try_claim_producer(&self) -> Option<Self> {
        self.try_clone_as(ShardOwnership::PRODUCER)
    }

    pub(crate) fn try_claim_consumer(&self) -> Option<Self> {
        self.try_clone_as(ShardOwnership::CONSUMER)
    }
}

#[cfg(feature = "async")]
impl<T, O: Ownership> QueuePtr<T, O> {
    #[inline(always)]
    pub fn register_sender_waker(&self, waker: &Waker) {
        unsafe {
            _field!(Queue<O>, self.ptr, tail.sender_waker.value, AtomicWaker)
                .as_ref()
                .register(waker);
        }
    }

    #[inline(always)]
    pub fn register_receiver_waker(&self, waker: &Waker) {
        unsafe {
            _field!(Queue<O>, self.ptr, head.receiver_waker.value, AtomicWaker)
                .as_ref()
                .register(waker);
        }
    }

    #[inline(always)]
    pub(crate) fn wake_sender(&self) {
        unsafe {
            _field!(Queue<O>, self.ptr, tail.sender_waker.value, AtomicWaker)
                .as_ref()
                .wake();
        }
    }

    #[inline(always)]
    pub(crate) fn wake_receiver(&self) {
        unsafe {
            _field!(Queue<O>, self.ptr, head.receiver_waker.value, AtomicWaker)
                .as_ref()
                .wake();
        }
    }
}