clickhouse-driver 0.1.0-alpha.3

Asynchronous tokio-based Yandex ClickHouse driver.
Documentation
//! A common utility for building synchronization primitives.
//!
//! When an async operation is blocked, it needs to register itself somewhere so that it can be
//! notified later on. The `WakerSet` type helps with keeping track of such async operations and
//! notifying them when they may make progress.

use crossbeam::utils::Backoff;
use slab::Slab;
use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Waker};

/// Set when the entry list is locked.
#[allow(clippy::identity_op)]
const LOCKED: usize = 1 << 0;

/// Set when there is at least one entry that has already been notified.
const NOTIFIED: usize = 1 << 1;

/// Set when there is at least one notifiable entry.
const NOTIFIABLE: usize = 1 << 2;

/// Inner representation of `WakerSet`.
struct Inner {
    /// A list of entries in the set.
    ///
    /// Each entry has an optional waker associated with the task that is executing the operation.
    /// If the waker is set to `None`, that means the task has been woken up but hasn't removed
    /// itself from the `WakerSet` yet.
    ///
    /// The key of each entry is its index in the `Slab`.
    entries: Slab<Option<Waker>>,

    /// The number of notifiable entries.
    notifiable: usize,
}

/// A set holding wakers.
pub struct WakerSet {
    /// Holds three bits: `LOCKED`, `NOTIFY_ONE`, and `NOTIFY_ALL`.
    flag: AtomicUsize,

    /// A set holding wakers.
    inner: UnsafeCell<Inner>,
}

unsafe impl Sync for WakerSet {}

#[allow(dead_code)]
impl WakerSet {
    /// Creates a new `WakerSet`.
    #[inline]
    pub fn new() -> WakerSet {
        WakerSet {
            flag: AtomicUsize::new(0),
            inner: UnsafeCell::new(Inner {
                entries: Slab::new(),
                notifiable: 0,
            }),
        }
    }

    /// Inserts a waker for a blocked operation and returns a key associated with it.
    #[cold]
    pub fn insert(&self, cx: &Context<'_>) -> usize {
        let w = cx.waker().clone();
        let mut inner = self.lock();

        let key = inner.entries.insert(Some(w));
        inner.notifiable += 1;
        key
    }

    /// Removes the waker of an operation.
    #[cold]
    pub fn remove(&self, key: usize) {
        let mut inner = self.lock();

        if inner.entries.remove(key).is_some() {
            inner.notifiable -= 1;
        }
    }

    /// If the waker for this key is still waiting for a notification, then update
    /// the waker for the entry, and return false. If the waker has been notified,
    /// treat the entry as completed and return true.
    pub fn remove_if_notified(&self, key: usize, cx: &Context<'_>) -> bool {
        let mut inner = self.lock();

        match &mut inner.entries[key] {
            None => {
                inner.entries.remove(key);
                true
            }
            Some(w) => {
                // We were never woken, so update instead
                if !w.will_wake(cx.waker()) {
                    *w = cx.waker().clone();
                }
                false
            }
        }
    }

    /// Removes the waker of a cancelled operation.
    ///
    /// Returns `true` if another blocked operation from the set was notified.
    #[cold]
    pub fn cancel(&self, key: usize) -> bool {
        let mut inner = self.lock();

        match inner.entries.remove(key) {
            Some(_) => inner.notifiable -= 1,
            None => {
                // The operation was cancelled and notified so notify another operation instead.
                for (_, opt_waker) in inner.entries.iter_mut() {
                    // If there is no waker in this entry, that means it was already woken.
                    if let Some(w) = opt_waker.take() {
                        w.wake();
                        inner.notifiable -= 1;
                        return true;
                    }
                }
            }
        }

        false
    }

    /// Notifies a blocked operation if none have been notified already.
    ///
    /// Returns `true` if an operation was notified.
    #[inline]
    pub fn notify_any(&self) -> bool {
        // Use `SeqCst` ordering to synchronize with `Lock::drop()`.
        let flag = self.flag.load(Ordering::SeqCst);

        if flag & NOTIFIED == 0 && flag & NOTIFIABLE != 0 {
            self.notify(Notify::Any)
        } else {
            false
        }
    }

    /// Notifies one additional blocked operation.
    ///
    /// Returns `true` if an operation was notified.
    #[inline]
    pub fn notify_one(&self) -> bool {
        // Use `SeqCst` ordering to synchronize with `Lock::drop()`.
        if self.flag.load(Ordering::SeqCst) & NOTIFIABLE != 0 {
            self.notify(Notify::One)
        } else {
            false
        }
    }

    /// Notifies all blocked operations.
    ///
    /// Returns `true` if at least one operation was notified.
    #[inline]
    pub fn notify_all(&self) -> bool {
        // Use `SeqCst` ordering to synchronize with `Lock::drop()`.
        if self.flag.load(Ordering::SeqCst) & NOTIFIABLE != 0 {
            self.notify(Notify::All)
        } else {
            false
        }
    }

    /// Notifies blocked operations, either one or all of them.
    ///
    /// Returns `true` if at least one operation was notified.
    #[cold]
    fn notify(&self, n: Notify) -> bool {
        let mut inner = &mut *self.lock();
        let mut notified = false;

        for (_, opt_waker) in inner.entries.iter_mut() {
            // If there is no waker in this entry, that means it was already woken.
            if let Some(w) = opt_waker.take() {
                w.wake();
                inner.notifiable -= 1;
                notified = true;

                if n == Notify::One {
                    break;
                }
            }

            if n == Notify::Any {
                break;
            }
        }

        notified
    }

    /// Locks the list of entries.
    fn lock(&self) -> Lock<'_> {
        let backoff = Backoff::new();
        while self.flag.fetch_or(LOCKED, Ordering::Acquire) & LOCKED != 0 {
            backoff.snooze();
        }
        Lock { waker_set: self }
    }

    pub fn len(&self) -> usize {
        let inner = &mut *self.lock();
        inner
            .entries
            .iter()
            .filter(|(_, item)| item.is_some())
            .count()
    }
}

/// A guard holding a `WakerSet` locked.
struct Lock<'a> {
    waker_set: &'a WakerSet,
}

impl Drop for Lock<'_> {
    #[inline]
    fn drop(&mut self) {
        let mut flag = 0;

        // Set the `NOTIFIED` flag if there is at least one notified entry.
        if self.entries.len() - self.notifiable > 0 {
            flag |= NOTIFIED;
        }

        // Set the `NOTIFIABLE` flag if there is at least one notifiable entry.
        if self.notifiable > 0 {
            flag |= NOTIFIABLE;
        }

        // Use `SeqCst` ordering to synchronize with `WakerSet::lock_to_notify()`.
        self.waker_set.flag.store(flag, Ordering::SeqCst);
    }
}

impl Deref for Lock<'_> {
    type Target = Inner;

    #[inline]
    fn deref(&self) -> &Inner {
        unsafe { &*self.waker_set.inner.get() }
    }
}

impl DerefMut for Lock<'_> {
    #[inline]
    fn deref_mut(&mut self) -> &mut Inner {
        unsafe { &mut *self.waker_set.inner.get() }
    }
}

/// Notification strategy.
#[derive(Clone, Copy, Eq, PartialEq)]
enum Notify {
    /// Make sure at least one entry is notified.
    Any,
    /// Notify one additional entry.
    One,
    /// Notify all entries.
    All,
}