use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
#[allow(dead_code)]
pub(crate) struct PickState<T> {
pub result: Option<T>,
pub waker: Option<Waker>,
}
#[allow(dead_code)]
#[must_use = "futures do nothing unless polled"]
pub(crate) struct PickFuture<T> {
pub state: Arc<Mutex<PickState<T>>>,
}
impl<T: Send + 'static> Future for PickFuture<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut guard = match self.state.lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
if let Some(value) = guard.result.take() {
return Poll::Ready(value);
}
guard.waker = Some(cx.waker().clone());
Poll::Pending
}
}
#[allow(dead_code)]
pub(crate) fn deliver<T>(state: &Arc<Mutex<PickState<T>>>, value: T) {
let waker = {
let mut guard = match state.lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
guard.result = Some(value);
guard.waker.take()
};
if let Some(w) = waker {
w.wake();
}
}
#[allow(dead_code)]
pub(crate) fn new_pick_future<T>() -> (PickFuture<T>, Arc<Mutex<PickState<T>>>) {
let state = Arc::new(Mutex::new(PickState {
result: None,
waker: None,
}));
let future = PickFuture {
state: Arc::clone(&state),
};
(future, state)
}
#[cfg(test)]
mod tests {
use super::*;
use std::task::{RawWaker, RawWakerVTable};
fn noop_waker() -> Waker {
fn noop(_: *const ()) {}
fn clone(p: *const ()) -> RawWaker {
RawWaker::new(p, &VTABLE)
}
const VTABLE: RawWakerVTable =
RawWakerVTable::new(clone, noop, noop, noop);
unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) }
}
#[test]
fn future_returns_pending_before_delivery() {
let (mut future, _state) = new_pick_future::<i32>();
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let pinned = Pin::new(&mut future);
assert!(
pinned.poll(&mut cx).is_pending(),
"future should be Pending before deliver()"
);
}
#[test]
fn future_returns_ready_after_delivery() {
let (future, state) = new_pick_future::<i32>();
deliver(&state, 42);
let result = pollster::block_on(future);
assert_eq!(result, 42);
}
#[test]
fn future_returns_ready_after_delivery_with_pollster() {
let (future, state) = new_pick_future::<String>();
deliver(&state, "hello".to_owned());
let result = pollster::block_on(future);
assert_eq!(result, "hello");
}
#[test]
fn deliver_wakes_the_waker() {
use std::sync::atomic::{AtomicBool, Ordering};
static WOKEN: AtomicBool = AtomicBool::new(false);
fn noop(_: *const ()) {}
fn wake(_: *const ()) {
WOKEN.store(true, Ordering::SeqCst);
}
fn clone_fn(p: *const ()) -> RawWaker {
RawWaker::new(p, &WAKE_VTABLE)
}
const WAKE_VTABLE: RawWakerVTable =
RawWakerVTable::new(clone_fn, wake, wake, noop);
WOKEN.store(false, Ordering::SeqCst);
let (mut future, state) = new_pick_future::<i32>();
let waker =
unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &WAKE_VTABLE)) };
let mut cx = Context::from_waker(&waker);
let _ = Pin::new(&mut future).poll(&mut cx);
deliver(&state, 99);
assert!(WOKEN.load(Ordering::SeqCst), "waker should have been called");
}
}