use pin_project::pin_project;
use std::{
future::Future,
mem,
pin::Pin,
sync::{
atomic::{
AtomicBool,
Ordering,
},
Arc,
},
task::{
Context,
Poll,
RawWaker,
RawWakerVTable,
Waker,
},
};
pub fn runner<F: Future>(future: F) -> SimpleRunner<F> {
let waker_data = Arc::new(AtomicBool::new(true));
let waker_obj = waker::create(Arc::clone(&waker_data));
SimpleRunner {
future,
is_awake: waker_data,
cached_waker: waker_obj,
}
}
#[pin_project]
pub struct SimpleRunner<F: Future> {
#[pin]
future: F,
is_awake: Arc<AtomicBool>,
cached_waker: Waker,
}
impl<F: Future> SimpleRunner<F> {
pub fn is_awake(&self) -> bool {
self.is_awake.load(Ordering::Acquire)
}
pub fn poll(self: Pin<&mut Self>) -> Poll<F::Output> {
let this = self.project();
this.is_awake.store(false, Ordering::Release);
let mut ctx = Context::from_waker(&this.cached_waker);
this.future.poll(&mut ctx)
}
}
pub fn yield_() -> impl Future<Output = ()> {
YieldFuture(true)
}
struct YieldFuture(bool);
impl Future for YieldFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
if this.0 {
this.0 = false;
ctx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(())
}
}
}
mod waker {
use super::*;
pub fn create(data: Arc<AtomicBool>) -> Waker {
let raw_waker = RawWaker::new(Arc::into_raw(data) as *const _, &VTABLE);
unsafe { Waker::from_raw(raw_waker) }
}
unsafe fn v_clone(p: *const ()) -> RawWaker {
let rc: Arc<AtomicBool> = Arc::from_raw(p as *const _);
let new_ref = Arc::clone(&rc);
Arc::into_raw(rc); RawWaker::new(Arc::into_raw(new_ref) as *const _, &VTABLE)
}
unsafe fn v_wake(p: *const ()) {
let rc: Arc<AtomicBool> = Arc::from_raw(p as *const _);
rc.store(true, Ordering::Release);
}
unsafe fn v_wake_by_ref(p: *const ()) {
let rc: Arc<AtomicBool> = Arc::from_raw(p as *const _);
rc.store(true, Ordering::Release);
Arc::into_raw(rc); }
unsafe fn v_drop(p: *const ()) {
mem::drop(Arc::from_raw(p as *const _))
}
const VTABLE: RawWakerVTable = RawWakerVTable::new(v_clone, v_wake, v_wake_by_ref, v_drop);
}
#[cfg(test)]
mod tests {
use super::*;
use futures_channel::{
mpsc,
oneshot,
};
use futures_util::{
pin_mut,
stream::StreamExt,
};
#[test]
fn channel_take_preset() {
let (mut sender, receiver) = mpsc::channel(5);
sender.try_send(1u32).unwrap();
sender.try_send(2).unwrap();
sender.try_send(3).unwrap();
sender.close_channel();
let coro = runner(async move {
pin_mut!(receiver);
assert_eq!(receiver.next().await, Some(1));
assert_eq!(receiver.next().await, Some(2));
assert_eq!(receiver.next().await, Some(3));
assert_eq!(receiver.next().await, None);
42
});
pin_mut!(coro);
assert_eq!(coro.as_mut().poll(), Poll::Ready(42));
}
#[test]
fn channel_take_set_during() {
let (mut sender, receiver) = mpsc::channel::<(u32, oneshot::Sender<u32>)>(5);
let coro = runner(async move {
println!("Enter");
pin_mut!(receiver);
for i in 0..5u32 {
println!("Receiving {}", i);
let (n, ok) = receiver.next().await.unwrap();
assert_eq!(n, i);
println!("Received");
ok.send(n * 2).unwrap();
}
assert!(receiver.next().await.is_none());
64
});
pin_mut!(coro);
assert_eq!(coro.as_mut().poll(), Poll::Pending);
for i in 0..5u32 {
println!("Sending {}", i);
assert!(!coro.is_awake());
let (result_sender, mut result_receiver) = oneshot::channel();
sender.try_send((i, result_sender)).unwrap();
assert!(coro.is_awake());
assert_eq!(coro.as_mut().poll(), Poll::Pending);
assert_eq!(result_receiver.try_recv(), Ok(Some(i * 2)));
}
sender.close_channel();
assert_eq!(coro.as_mut().poll(), Poll::Ready(64));
}
#[test]
fn yield_test() {
let coro = runner(async move {
yield_().await;
yield_().await;
yield_().await;
return 32;
});
pin_mut!(coro);
assert!(coro.is_awake());
assert_eq!(coro.as_mut().poll(), Poll::Pending);
assert!(coro.is_awake());
assert_eq!(coro.as_mut().poll(), Poll::Pending);
assert!(coro.is_awake());
assert_eq!(coro.as_mut().poll(), Poll::Pending);
assert!(coro.is_awake());
assert_eq!(coro.as_mut().poll(), Poll::Ready(32));
}
}