use std::{future::Future, pin::Pin, rc::Rc, task::Poll, time::Duration};
use futures::pin_mut;
pub mod mutex;
pub mod oneshot;
pub mod timeout;
pub mod watch;
pub use mutex::Mutex;
pub use futures;
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
#[error("sender dropped")]
pub struct RecvError;
mod waker {
use crate::fiber;
use std::rc::Rc;
use std::task::RawWaker;
use std::task::RawWakerVTable;
use std::task::Waker;
#[derive(Default)]
pub struct FiberWaker {
cond: fiber::Cond,
}
impl FiberWaker {
pub fn cond(&self) -> &fiber::Cond {
&self.cond
}
pub fn wake(&self) {
self.cond.broadcast()
}
}
unsafe impl Send for FiberWaker {}
unsafe impl Sync for FiberWaker {}
pub fn with_rcw(rcw: Rc<FiberWaker>) -> Waker {
let raw_waker = raw_waker(rcw);
unsafe { Waker::from_raw(raw_waker) }
}
fn raw_waker(rcw: Rc<FiberWaker>) -> RawWaker {
const RC_WAKER_VT: RawWakerVTable = RawWakerVTable::new(
rc_waker_clone,
rc_waker_wake,
rc_waker_wake_by_ref,
rc_waker_drop,
);
let ptr: *const () = Rc::into_raw(rcw).cast();
RawWaker::new(ptr, &RC_WAKER_VT)
}
unsafe fn rc_waker_clone(data: *const ()) -> RawWaker {
let rcw: Rc<FiberWaker> = {
Rc::increment_strong_count(data);
Rc::from_raw(data.cast())
};
raw_waker(rcw)
}
unsafe fn rc_waker_wake(data: *const ()) {
let rcw: Rc<FiberWaker> = Rc::from_raw(data.cast());
rcw.wake();
drop(rcw);
}
unsafe fn rc_waker_wake_by_ref(data: *const ()) {
let rcw: Rc<FiberWaker> = Rc::from_raw(data.cast());
rcw.wake();
std::mem::forget(rcw);
}
unsafe fn rc_waker_drop(data: *const ()) {
let rcw: Rc<FiberWaker> = Rc::from_raw(data.cast());
drop(rcw)
}
}
pub(crate) mod context {
use std::os::unix::io::RawFd;
use std::task::Context;
use std::task::Waker;
use crate::ffi::tarantool as ffi;
use crate::time::Instant;
#[repr(C)]
pub struct ContextExt<'a> {
cx: Context<'a>,
pub(super) deadline: Option<Instant>,
pub(super) coio_wait: Option<(RawFd, ffi::CoIOFlags)>,
}
impl<'a> ContextExt<'a> {
#[must_use]
pub fn from_waker(waker: &'a Waker) -> Self {
Self {
cx: Context::from_waker(waker),
deadline: None,
coio_wait: None,
}
}
pub fn cx(&mut self) -> &mut Context<'a> {
&mut self.cx
}
pub(crate) unsafe fn as_context_ext<'b>(cx: &'b mut Context<'_>) -> &'b mut Self {
let cx: &mut ContextExt = &mut *(cx as *mut Context).cast();
cx
}
pub unsafe fn set_deadline(cx: &mut Context<'_>, new: Instant) {
let cx = Self::as_context_ext(cx);
if let Some(ref mut deadline) = cx.deadline {
if new < *deadline {
*deadline = new
}
} else {
cx.deadline = Some(new)
}
}
pub unsafe fn set_coio_wait(cx: &mut Context<'_>, fd: RawFd, event: ffi::CoIOFlags) {
let cx = Self::as_context_ext(cx);
cx.coio_wait = Some((fd, event));
}
}
}
pub struct OnDrop<Fut, Fun: FnOnce()> {
future: Fut,
on_drop: Option<Fun>,
}
impl<Fut: Future, Fun: FnOnce()> OnDrop<Fut, Fun> {
#[inline]
fn pin_get_future(self: Pin<&mut Self>) -> Pin<&mut Fut> {
unsafe { self.map_unchecked_mut(|s| &mut s.future) }
}
}
impl<Fut: Future, Fun: FnOnce()> Future for OnDrop<Fut, Fun> {
type Output = Fut::Output;
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
self.pin_get_future().poll(cx)
}
}
impl<Fut, Fun: FnOnce()> Drop for OnDrop<Fut, Fun> {
fn drop(&mut self) {
(self.on_drop.take().unwrap())()
}
}
pub fn on_drop<Fut: Future, Fun: FnOnce()>(future: Fut, on_drop: Fun) -> OnDrop<Fut, Fun> {
OnDrop {
future,
on_drop: Some(on_drop),
}
}
pub trait IntoOnDrop: Future + Sized {
#[inline]
fn on_drop<Fun: FnOnce()>(self, on_drop: Fun) -> OnDrop<Self, Fun> {
self::on_drop(self, on_drop)
}
}
impl<T> IntoOnDrop for T where T: Future + Sized {}
pub fn block_on<F: Future>(f: F) -> F::Output {
let rcw: Rc<waker::FiberWaker> = Default::default();
let waker = waker::with_rcw(rcw.clone());
pin_mut!(f);
loop {
let mut cx = context::ContextExt::from_waker(&waker);
if let Poll::Ready(t) = f.as_mut().poll(cx.cx()) {
return t;
}
let timeout = match cx.deadline {
Some(deadline) => deadline.duration_since(super::clock()),
None => Duration::MAX,
};
if let Some((fd, event)) = cx.coio_wait {
unsafe {
crate::ffi::tarantool::coio_wait(fd, event.bits(), timeout.as_secs_f64());
}
} else {
rcw.cond().wait_timeout(timeout);
}
}
}
pub async fn sleep(time: Duration) {
use timeout::IntoTimeout as _;
let (tx, rx) = oneshot::channel::<()>();
rx.timeout(time).await.unwrap_err();
drop(tx);
}
#[cfg(feature = "internal_test")]
mod tests {
use std::cell::Cell;
use super::timeout::IntoTimeout as _;
use super::*;
use crate::fiber;
use crate::test::util::{always_pending, ok};
#[crate::test(tarantool = "crate")]
fn sleep_wakes_up() {
let before_sleep = fiber::clock();
let sleep_for = Duration::from_millis(100);
let should_yield = fiber::check_yield(|| fiber::block_on(sleep(sleep_for)));
assert_eq!(should_yield, fiber::YieldResult::Yielded(()));
assert!(before_sleep.elapsed() >= sleep_for);
}
#[crate::test(tarantool = "crate")]
fn on_drop_is_executed() {
block_on(async {
let mut executed = false;
always_pending()
.on_drop(|| executed = true)
.timeout(Duration::from_secs(0))
.await
.unwrap_err();
assert!(executed);
let mut executed = false;
std::future::ready(ok(()))
.on_drop(|| executed = true)
.timeout(Duration::from_secs(0))
.await
.unwrap();
assert!(executed);
});
}
#[crate::test(tarantool = "crate")]
fn nested_on_drop_is_executed() {
let executed = Rc::new(Cell::new(false));
let f = async { always_pending().on_drop(|| executed.set(true)).await };
block_on(async {
f.timeout(Duration::from_secs(0)).await.unwrap_err();
});
assert!(executed.get());
}
}