crossbeam-utils 0.8.14

Utilities for concurrent programming
Documentation
use crate::primitive::sync::atomic::AtomicUsize;
use crate::primitive::sync::{Arc, Condvar, Mutex};
use core::sync::atomic::Ordering::SeqCst;
use std::fmt;
use std::marker::PhantomData;
use std::time::{Duration, Instant};

/// A thread parking primitive.
///
/// Conceptually, each `Parker` has an associated token which is initially not present:
///
/// * The [`park`] method blocks the current thread unless or until the token is available, at
///   which point it automatically consumes the token.
///
/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for
///   a specified maximum time.
///
/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the
///   token is initially absent, [`unpark`] followed by [`park`] will result in the second call
///   returning immediately.
///
/// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using
/// [`park`] and [`unpark`].
///
/// # Examples
///
/// ```
/// use std::thread;
/// use std::time::Duration;
/// use crossbeam_utils::sync::Parker;
///
/// let p = Parker::new();
/// let u = p.unparker().clone();
///
/// // Make the token available.
/// u.unpark();
/// // Wakes up immediately and consumes the token.
/// p.park();
///
/// thread::spawn(move || {
///     thread::sleep(Duration::from_millis(500));
///     u.unpark();
/// });
///
/// // Wakes up when `u.unpark()` provides the token.
/// p.park();
/// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
/// ```
///
/// [`park`]: Parker::park
/// [`park_timeout`]: Parker::park_timeout
/// [`park_deadline`]: Parker::park_deadline
/// [`unpark`]: Unparker::unpark
pub struct Parker {
    unparker: Unparker,
    _marker: PhantomData<*const ()>,
}

unsafe impl Send for Parker {}

impl Default for Parker {
    fn default() -> Self {
        Self {
            unparker: Unparker {
                inner: Arc::new(Inner {
                    state: AtomicUsize::new(EMPTY),
                    lock: Mutex::new(()),
                    cvar: Condvar::new(),
                }),
            },
            _marker: PhantomData,
        }
    }
}

impl Parker {
    /// Creates a new `Parker`.
    ///
    /// # Examples
    ///
    /// ```
    /// use crossbeam_utils::sync::Parker;
    ///
    /// let p = Parker::new();
    /// ```
    ///
    pub fn new() -> Parker {
        Self::default()
    }

    /// Blocks the current thread until the token is made available.
    ///
    /// # Examples
    ///
    /// ```
    /// use crossbeam_utils::sync::Parker;
    ///
    /// let p = Parker::new();
    /// let u = p.unparker().clone();
    ///
    /// // Make the token available.
    /// u.unpark();
    ///
    /// // Wakes up immediately and consumes the token.
    /// p.park();
    /// ```
    pub fn park(&self) {
        self.unparker.inner.park(None);
    }

    /// Blocks the current thread until the token is made available, but only for a limited time.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::time::Duration;
    /// use crossbeam_utils::sync::Parker;
    ///
    /// let p = Parker::new();
    ///
    /// // Waits for the token to become available, but will not wait longer than 500 ms.
    /// p.park_timeout(Duration::from_millis(500));
    /// ```
    pub fn park_timeout(&self, timeout: Duration) {
        self.park_deadline(Instant::now() + timeout)
    }

    /// Blocks the current thread until the token is made available, or until a certain deadline.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::time::{Duration, Instant};
    /// use crossbeam_utils::sync::Parker;
    ///
    /// let p = Parker::new();
    /// let deadline = Instant::now() + Duration::from_millis(500);
    ///
    /// // Waits for the token to become available, but will not wait longer than 500 ms.
    /// p.park_deadline(deadline);
    /// ```
    pub fn park_deadline(&self, deadline: Instant) {
        self.unparker.inner.park(Some(deadline))
    }

    /// Returns a reference to an associated [`Unparker`].
    ///
    /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned.
    ///
    /// # Examples
    ///
    /// ```
    /// use crossbeam_utils::sync::Parker;
    ///
    /// let p = Parker::new();
    /// let u = p.unparker().clone();
    ///
    /// // Make the token available.
    /// u.unpark();
    /// // Wakes up immediately and consumes the token.
    /// p.park();
    /// ```
    ///
    /// [`park`]: Parker::park
    /// [`park_timeout`]: Parker::park_timeout
    pub fn unparker(&self) -> &Unparker {
        &self.unparker
    }

    /// Converts a `Parker` into a raw pointer.
    ///
    /// # Examples
    ///
    /// ```
    /// use crossbeam_utils::sync::Parker;
    ///
    /// let p = Parker::new();
    /// let raw = Parker::into_raw(p);
    /// # let _ = unsafe { Parker::from_raw(raw) };
    /// ```
    pub fn into_raw(this: Parker) -> *const () {
        Unparker::into_raw(this.unparker)
    }

    /// Converts a raw pointer into a `Parker`.
    ///
    /// # Safety
    ///
    /// This method is safe to use only with pointers returned by [`Parker::into_raw`].
    ///
    /// # Examples
    ///
    /// ```
    /// use crossbeam_utils::sync::Parker;
    ///
    /// let p = Parker::new();
    /// let raw = Parker::into_raw(p);
    /// let p = unsafe { Parker::from_raw(raw) };
    /// ```
    pub unsafe fn from_raw(ptr: *const ()) -> Parker {
        Parker {
            unparker: Unparker::from_raw(ptr),
            _marker: PhantomData,
        }
    }
}

impl fmt::Debug for Parker {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.pad("Parker { .. }")
    }
}

/// Unparks a thread parked by the associated [`Parker`].
pub struct Unparker {
    inner: Arc<Inner>,
}

unsafe impl Send for Unparker {}
unsafe impl Sync for Unparker {}

impl Unparker {
    /// Atomically makes the token available if it is not already.
    ///
    /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is
    /// any.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::thread;
    /// use std::time::Duration;
    /// use crossbeam_utils::sync::Parker;
    ///
    /// let p = Parker::new();
    /// let u = p.unparker().clone();
    ///
    /// thread::spawn(move || {
    ///     thread::sleep(Duration::from_millis(500));
    ///     u.unpark();
    /// });
    ///
    /// // Wakes up when `u.unpark()` provides the token.
    /// p.park();
    /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
    /// ```
    ///
    /// [`park`]: Parker::park
    /// [`park_timeout`]: Parker::park_timeout
    pub fn unpark(&self) {
        self.inner.unpark()
    }

    /// Converts an `Unparker` into a raw pointer.
    ///
    /// # Examples
    ///
    /// ```
    /// use crossbeam_utils::sync::{Parker, Unparker};
    ///
    /// let p = Parker::new();
    /// let u = p.unparker().clone();
    /// let raw = Unparker::into_raw(u);
    /// # let _ = unsafe { Unparker::from_raw(raw) };
    /// ```
    pub fn into_raw(this: Unparker) -> *const () {
        Arc::into_raw(this.inner).cast::<()>()
    }

    /// Converts a raw pointer into an `Unparker`.
    ///
    /// # Safety
    ///
    /// This method is safe to use only with pointers returned by [`Unparker::into_raw`].
    ///
    /// # Examples
    ///
    /// ```
    /// use crossbeam_utils::sync::{Parker, Unparker};
    ///
    /// let p = Parker::new();
    /// let u = p.unparker().clone();
    ///
    /// let raw = Unparker::into_raw(u);
    /// let u = unsafe { Unparker::from_raw(raw) };
    /// ```
    pub unsafe fn from_raw(ptr: *const ()) -> Unparker {
        Unparker {
            inner: Arc::from_raw(ptr.cast::<Inner>()),
        }
    }
}

impl fmt::Debug for Unparker {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.pad("Unparker { .. }")
    }
}

impl Clone for Unparker {
    fn clone(&self) -> Unparker {
        Unparker {
            inner: self.inner.clone(),
        }
    }
}

const EMPTY: usize = 0;
const PARKED: usize = 1;
const NOTIFIED: usize = 2;

struct Inner {
    state: AtomicUsize,
    lock: Mutex<()>,
    cvar: Condvar,
}

impl Inner {
    fn park(&self, deadline: Option<Instant>) {
        // If we were previously notified then we consume this notification and return quickly.
        if self
            .state
            .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
            .is_ok()
        {
            return;
        }

        // If the timeout is zero, then there is no need to actually block.
        if let Some(deadline) = deadline {
            if deadline <= Instant::now() {
                return;
            }
        }

        // Otherwise we need to coordinate going to sleep.
        let mut m = self.lock.lock().unwrap();

        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
            Ok(_) => {}
            // Consume this notification to avoid spurious wakeups in the next park.
            Err(NOTIFIED) => {
                // We must read `state` here, even though we know it will be `NOTIFIED`. This is
                // because `unpark` may have been called again since we read `NOTIFIED` in the
                // `compare_exchange` above. We must perform an acquire operation that synchronizes
                // with that `unpark` to observe any writes it made before the call to `unpark`. To
                // do that we must read from the write it made to `state`.
                let old = self.state.swap(EMPTY, SeqCst);
                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
                return;
            }
            Err(n) => panic!("inconsistent park_timeout state: {}", n),
        }

        loop {
            // Block the current thread on the conditional variable.
            m = match deadline {
                None => self.cvar.wait(m).unwrap(),
                Some(deadline) => {
                    let now = Instant::now();
                    if now < deadline {
                        // We could check for a timeout here, in the return value of wait_timeout,
                        // but in the case that a timeout and an unpark arrive simultaneously, we
                        // prefer to report the former.
                        self.cvar.wait_timeout(m, deadline - now).unwrap().0
                    } else {
                        // We've timed out; swap out the state back to empty on our way out
                        match self.state.swap(EMPTY, SeqCst) {
                            NOTIFIED | PARKED => return,
                            n => panic!("inconsistent park_timeout state: {}", n),
                        };
                    }
                }
            };

            if self
                .state
                .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
                .is_ok()
            {
                // got a notification
                return;
            }

            // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught
            // in the branch above, when we discover the deadline is in the past
        }
    }

    pub(crate) fn unpark(&self) {
        // To ensure the unparked thread will observe any writes we made before this call, we must
        // perform a release operation that `park` can synchronize with. To do that we must write
        // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
        // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
        match self.state.swap(NOTIFIED, SeqCst) {
            EMPTY => return,    // no one was waiting
            NOTIFIED => return, // already unparked
            PARKED => {}        // gotta go wake someone up
            _ => panic!("inconsistent state in unpark"),
        }

        // There is a period between when the parked thread sets `state` to `PARKED` (or last
        // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
        // If we were to notify during this period it would be ignored and then when the parked
        // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
        // stage so we can acquire `lock` to wait until it is ready to receive the notification.
        //
        // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
        // it doesn't get woken only to have to wait for us to release `lock`.
        drop(self.lock.lock().unwrap());
        self.cvar.notify_one();
    }
}