wtx 0.44.1

A collection of different transport implementations and related tools focused primarily on web technologies.
Documentation
use crate::sync::AtomicUsize;
use core::{
  cell::UnsafeCell,
  fmt::{self, Debug},
  sync::atomic::Ordering,
  task::Waker,
};

// Idle. No registration or wake operation is in progress.
const WAITING: usize = 0;
// A task is in the middle of storing a waker or is already registered.
const REGISTERING: usize = 0b01;
// A signal has been issued with `take` to awake a registered waker.
const WAKING: usize = 0b10;

/// [Waker] that can be shared across tasks.
pub struct AtomicWaker {
  state: AtomicUsize,
  waker: UnsafeCell<Option<Waker>>,
}

impl AtomicWaker {
  /// Creates an idle instance.
  #[inline]
  pub const fn new() -> Self {
    AtomicWaker { state: AtomicUsize::new(WAITING), waker: UnsafeCell::new(None) }
  }

  /// Checks if a waker is currently registered.
  ///
  /// This is a best-effort because the state may change immediately after this call.
  #[inline]
  pub fn is_registered(&self) -> bool {
    // Use Relaxed?
    self.state.load(Ordering::Acquire) != WAITING
  }

  /// Registers the waker to be notified on calls to `wake`.
  #[inline]
  pub fn register(&self, waker: &Waker) {
    let prev_state = self
      .state
      .compare_exchange(WAITING, REGISTERING, Ordering::Acquire, Ordering::Acquire)
      .unwrap_or_else(|el| el);
    match prev_state {
      WAITING => {
        // SAFETY: `compare_exchange` manages concurrent accesses.
        let waker_opt = unsafe { &mut *self.waker.get() };
        // `waker_opt` is `Some` when `take` (struct's method) sets `WAITING` but `take`
        // (option's method) wasn't called yet, which should be rare. In this scenario a clone
        // operation is avoid if both wakers refer the same task.
        if !matches!(waker_opt, Some(elem) if elem.will_wake(waker)) {
          *waker_opt = Some(waker.clone());
        }
        let prev_state_is_not_waiting = self
          .state
          .compare_exchange(REGISTERING, WAITING, Ordering::AcqRel, Ordering::Acquire)
          .is_err();
        if prev_state_is_not_waiting {
          let Some(local_waker) = waker_opt.take() else {
            return;
          };
          // Swap because data did not changed while state is `REGISTERING or `WAITING`
          let _ = self.state.swap(WAITING, Ordering::AcqRel);
          local_waker.wake();
        }
      }
      WAKING => {
        waker.wake_by_ref();
      }
      _ => {}
    }
  }

  /// Returns the last [Waker] passed to [`Self::register`], if any.
  #[inline]
  pub fn take(&self) -> Option<Waker> {
    match self.state.fetch_or(WAKING, Ordering::AcqRel) {
      WAITING => {
        // SAFETY: lock was acquire through `fetch_or` so the last waker can be retrieved.
        let waker = unsafe { (*self.waker.get()).take() };
        let _ = self.state.swap(WAITING, Ordering::Release);
        waker
      }
      _ => None,
    }
  }

  /// Consumes the last [Waker] passed in [`Self::register`], if any.
  #[inline]
  pub fn wake(&self) {
    if let Some(waker) = self.take() {
      waker.wake();
    }
  }
}

impl Debug for AtomicWaker {
  #[inline]
  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
    write!(f, "AtomicWaker")
  }
}

impl Default for AtomicWaker {
  #[inline]
  fn default() -> Self {
    AtomicWaker::new()
  }
}

// SAFETY: concurrent access is manually managed
unsafe impl Send for AtomicWaker {}

// SAFETY: concurrent access is manually managed
unsafe impl Sync for AtomicWaker {}

#[cfg(test)]
mod tests {
  use crate::{
    executor::Runtime,
    sync::{Arc, AtomicBool, AtomicWaker},
  };
  use core::{future::poll_fn, sync::atomic::Ordering, task::Poll};
  use std::thread;

  #[test]
  fn non_blocking_operation() {
    let atomic_waker = Arc::new(AtomicWaker::new());
    let atomic_waker_clone = atomic_waker.clone();

    let waiting = Arc::new(AtomicBool::new(false));
    let waiting_clone = waiting.clone();

    let woken = Arc::new(AtomicBool::new(false));
    let woken_clone = woken.clone();

    let jh = thread::spawn(move || {
      let mut pending = 0;
      Runtime::new().block_on(poll_fn(move |cx| {
        if woken_clone.load(Ordering::Relaxed) {
          Poll::Ready(())
        } else {
          assert_eq!(0, pending);
          pending += 1;
          atomic_waker_clone.register(cx.waker());
          waiting_clone.store(true, Ordering::Relaxed);
          Poll::Pending
        }
      }));
    });

    while !waiting.load(Ordering::Relaxed) {}

    thread::yield_now();
    woken.store(true, Ordering::Relaxed);
    atomic_waker.wake();
    jh.join().unwrap();
  }
}