use std::{future::Future, rc::Rc, task::Poll, time::Instant};
use futures::pin_mut;
pub mod oneshot;
pub mod timeout;
pub mod watch;
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
#[error("sender dropped")]
pub struct RecvError;
mod waker {
use crate::fiber;
use std::rc::Rc;
use std::task::RawWaker;
use std::task::RawWakerVTable;
use std::task::Waker;
#[derive(Default)]
pub struct FiberWaker {
cond: fiber::Cond,
}
impl FiberWaker {
pub fn cond(&self) -> &fiber::Cond {
&self.cond
}
pub fn wake(&self) {
self.cond.broadcast()
}
}
unsafe impl Send for FiberWaker {}
unsafe impl Sync for FiberWaker {}
pub fn with_rcw(rcw: Rc<FiberWaker>) -> Waker {
let raw_waker = raw_waker(rcw);
unsafe { Waker::from_raw(raw_waker) }
}
fn raw_waker(rcw: Rc<FiberWaker>) -> RawWaker {
const RC_WAKER_VT: RawWakerVTable = RawWakerVTable::new(
rc_waker_clone,
rc_waker_wake,
rc_waker_wake_by_ref,
rc_waker_drop,
);
let ptr: *const () = Rc::into_raw(rcw).cast();
RawWaker::new(ptr, &RC_WAKER_VT)
}
unsafe fn rc_waker_clone(data: *const ()) -> RawWaker {
let rcw: Rc<FiberWaker> = {
Rc::increment_strong_count(data);
Rc::from_raw(data.cast())
};
raw_waker(rcw)
}
unsafe fn rc_waker_wake(data: *const ()) {
let rcw: Rc<FiberWaker> = Rc::from_raw(data.cast());
rcw.wake();
drop(rcw);
}
unsafe fn rc_waker_wake_by_ref(data: *const ()) {
let rcw: Rc<FiberWaker> = Rc::from_raw(data.cast());
rcw.wake();
std::mem::forget(rcw);
}
unsafe fn rc_waker_drop(data: *const ()) {
let rcw: Rc<FiberWaker> = Rc::from_raw(data.cast());
drop(rcw)
}
}
mod context {
use std::task::Context;
use std::task::Waker;
use std::time::Instant;
#[repr(C)]
pub struct ContextExt<'a> {
cx: Context<'a>,
deadline: Option<Instant>,
}
impl<'a> ContextExt<'a> {
#[must_use]
pub fn from_waker(waker: &'a Waker) -> Self {
Self {
cx: Context::from_waker(waker),
deadline: None,
}
}
pub fn cx(&mut self) -> &mut Context<'a> {
&mut self.cx
}
pub fn deadline(&self) -> Option<Instant> {
self.deadline
}
pub unsafe fn set_deadline(cx: &mut Context<'_>, new: Instant) {
let cx: &mut ContextExt = &mut *(cx as *mut Context).cast();
if matches!(cx.deadline, Some(old) if new > old) {
return;
}
cx.deadline = Some(new);
}
}
}
pub fn block_on<F: Future>(f: F) -> F::Output {
let rcw: Rc<waker::FiberWaker> = Default::default();
let waker = waker::with_rcw(rcw.clone());
pin_mut!(f);
loop {
let mut cx = context::ContextExt::from_waker(&waker);
if let Poll::Ready(t) = f.as_mut().poll(cx.cx()) {
return t;
}
match cx.deadline() {
Some(deadline) => {
let timeout = deadline.saturating_duration_since(Instant::now());
rcw.cond().wait_timeout(timeout)
}
None => rcw.cond().wait(),
};
}
}