Crate waker_waiter
source ·Expand description
This library helps async runtimes support the execution of arbitrary futures, by enabling futures to provide their own event polling logic. It is an attempt to implement the approach described by Context reactor hook.
There are two integration points:
- Futures that need to run their own event polling logic in the execution thread must call
get_poller
to obtain aTopLevelPoller
and then callTopLevelPoller::set_waiter
to register aWakerWaiter
on it. - Whatever part of the application that is responsible for polling top-level futures (i.e. the async runtime) needs to implement the
TopLevelPoller
trait and provide it usingstd::task::ContextBuilder::ext
. This library provides such an implementation viablock_on
.
Only one WakerWaiter
can be registered on a TopLevelPoller
. If more than one future relies on the same event polling logic, the futures should coordinate and share the same WakerWaiter
.
§Example of a future registering a WakerWaiter
static REACTOR: Mutex<Option<Arc<Reactor>>> = Mutex::new(None);
struct Reactor {
waiter: Option<WakerWaiter>,
}
impl Reactor {
fn current() -> Arc<Reactor> {
let mut reactor = REACTOR.lock().unwrap();
if reactor.is_none() {
let r = Arc::new(Reactor { waiter: None });
let waiter = Arc::new(ReactorWaiter {
reactor: Arc::downgrade(&r),
}).into();
// SAFETY: nobody else could be borrowing right now
let r = unsafe {
let r = (Arc::into_raw(r) as *mut Reactor).as_mut().unwrap();
r.waiter = Some(waiter);
Arc::from_raw(r as *const Reactor)
};
*reactor = Some(r);
}
Arc::clone(reactor.as_ref().unwrap())
}
fn waiter<'a>(self: &'a Arc<Self>) -> &'a WakerWaiter {
self.waiter.as_ref().unwrap()
}
}
struct ReactorWaiter {
reactor: Weak<Reactor>,
}
impl WakerWait for ReactorWaiter {
fn wait(self: &Arc<Self>) {
// ... blocking poll for events ...
todo!();
}
fn canceler(self: &Arc<Self>) -> WakerWaiterCanceler {
// ... provide a way to unblock the above ...
todo!();
}
}
struct MyFuture;
impl Future for MyFuture {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let p = match get_poller(cx) {
Some(p) => p,
None => panic!("MyFuture requires context to provide TopLevelPoller"),
};
if p.set_waiter(Reactor::current().waiter()).is_err() {
panic!("Incompatible waiter already assigned to TopLevelPoller");
}
// ... register waker, perform I/O, etc ...
}
}
§Example of an executor providing a TopLevelPoller
#![feature(local_waker)]
#![feature(context_ext)]
struct ThreadWaker {
thread: Thread,
waiter: Arc<Mutex<Option<WakerWaiter>>>,
}
impl Wake for ThreadWaker {
fn wake(self: Arc<Self>) {
if thread::current().id() == self.thread.id() {
// if we were woken in the same thread as execution,
// then the wake was caused by the WakerWaiter which
// will return control without any signaling needed
return;
}
let waiter = self.waiter.lock().unwrap().clone();
if let Some(waiter) = waiter {
// if a waiter was configured, then the execution thread
// will be blocking on it and we'll need to unblock it
waiter.canceler().cancel();
} else {
// if a waiter was not configured, then the execution
// thread will be asleep with a standard thread park
self.thread.unpark();
}
}
}
#[derive(Clone)]
struct MyTopLevelPoller {
waiter: Arc<Mutex<Option<WakerWaiter>>>,
}
impl TopLevelPoller for MyTopLevelPoller {
fn set_waiter(&mut self, waiter: &WakerWaiter) -> Result<(), SetWaiterError> {
let self_waiter = &mut *self.waiter.lock().unwrap();
if let Some(cur) = self_waiter {
if cur == waiter {
return Ok(()); // already set to this waiter
} else {
return Err(SetWaiterError); // already set to a different waiter
}
}
*self_waiter = Some(waiter.clone());
Ok(())
}
}
let waiter = Arc::new(Mutex::new(None));
let waker = Arc::new(ThreadWaker {
thread: thread::current(),
waiter: Arc::clone(&waiter),
}).into();
let mut cx = Context::from_waker(&waker);
let mut poller = MyTopLevelPoller { waiter };
let mut fut = pin!(async { /* ... */ });
loop {
let result = {
let mut a = Anyable::new(&mut poller as &mut dyn TopLevelPoller);
let mut cx = ContextBuilder::from_waker(&waker).ext(a.as_any()).build();
fut.as_mut().poll(&mut cx)
};
match result {
Poll::Ready(res) => break res,
Poll::Pending => {
let waiter = poller.waiter.lock().unwrap().clone();
// if a waiter is configured then block on it. else do a
// standard thread park
match waiter {
Some(waiter) => waiter.wait(),
None => thread::park(),
}
}
}
}
Re-exports§
pub use ctx_ref::Anyable;