use std::async_iter::AsyncIterator;
use std::future::Future;
use std::mem::ManuallyDrop;
use std::pin::Pin;
use std::task::{self, Poll};
use std::time::{Duration, Instant};
use std::{io, ptr};
use heph::actor;
use crate::{self as rt, Bound};
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct DeadlinePassed;
impl From<DeadlinePassed> for io::Error {
fn from(_: DeadlinePassed) -> io::Error {
io::ErrorKind::TimedOut.into()
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Timer<RT: rt::Access> {
deadline: Instant,
rt: RT,
}
impl<RT: rt::Access> Timer<RT> {
pub fn at<M>(ctx: &mut actor::Context<M, RT>, deadline: Instant) -> Timer<RT>
where
RT: Clone,
{
let mut rt = ctx.runtime().clone();
rt.add_deadline(deadline);
Timer { deadline, rt }
}
pub fn after<M>(ctx: &mut actor::Context<M, RT>, timeout: Duration) -> Timer<RT>
where
RT: Clone,
{
Timer::at(ctx, Instant::now() + timeout)
}
pub const fn deadline(&self) -> Instant {
self.deadline
}
pub fn has_passed(&self) -> bool {
self.deadline <= Instant::now()
}
pub fn wrap<Fut>(self, future: Fut) -> Deadline<Fut, RT> {
let this = ManuallyDrop::new(self);
let deadline = unsafe { ptr::addr_of!(this.deadline).read() };
let rt = unsafe { ptr::addr_of!(this.rt).read() };
Deadline {
deadline,
future,
rt,
}
}
}
impl<RT: rt::Access> Future for Timer<RT> {
type Output = DeadlinePassed;
fn poll(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Self::Output> {
if self.has_passed() {
Poll::Ready(DeadlinePassed)
} else {
Poll::Pending
}
}
}
impl<RT: rt::Access> Unpin for Timer<RT> {}
impl<RT: rt::Access> Bound<RT> for Timer<RT> {
type Error = io::Error;
fn bind_to<M>(&mut self, ctx: &mut actor::Context<M, RT>) -> io::Result<()> {
let old_pid = self.rt.change_pid(ctx.runtime_ref().pid());
self.rt.change_deadline(old_pid, self.deadline);
Ok(())
}
}
impl<RT: rt::Access> Drop for Timer<RT> {
fn drop(&mut self) {
self.rt.remove_deadline(self.deadline);
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Deadline<Fut, RT: rt::Access> {
deadline: Instant,
future: Fut,
rt: RT,
}
impl<Fut, RT: rt::Access> Deadline<Fut, RT> {
pub fn at<M>(
ctx: &mut actor::Context<M, RT>,
deadline: Instant,
future: Fut,
) -> Deadline<Fut, RT>
where
RT: Clone,
{
let mut rt = ctx.runtime().clone();
rt.add_deadline(deadline);
Deadline {
deadline,
future,
rt,
}
}
pub fn after<M>(
ctx: &mut actor::Context<M, RT>,
timeout: Duration,
future: Fut,
) -> Deadline<Fut, RT>
where
RT: Clone,
{
Deadline::at(ctx, Instant::now() + timeout, future)
}
pub const fn deadline(&self) -> Instant {
self.deadline
}
pub fn has_passed(&self) -> bool {
self.deadline <= Instant::now()
}
pub const fn get_ref(&self) -> &Fut {
&self.future
}
pub fn get_mut(&mut self) -> &mut Fut {
&mut self.future
}
pub fn into_inner(mut self) -> Fut {
self.rt.remove_deadline(self.deadline);
let mut this = ManuallyDrop::new(self);
unsafe { ptr::addr_of_mut!(this.deadline).drop_in_place() }
unsafe { ptr::addr_of_mut!(this.rt).drop_in_place() }
unsafe { ptr::addr_of!(this.future).read() }
}
}
impl<Fut, RT: rt::Access, T, E> Future for Deadline<Fut, RT>
where
Fut: Future<Output = Result<T, E>>,
E: From<DeadlinePassed>,
{
type Output = Result<T, E>;
fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
if self.has_passed() {
Poll::Ready(Err(DeadlinePassed.into()))
} else {
let future = unsafe { Pin::map_unchecked_mut(self, |this| &mut this.future) };
future.poll(ctx)
}
}
}
impl<Fut: Unpin, RT: rt::Access> Unpin for Deadline<Fut, RT> {}
impl<Fut, RT: rt::Access> Bound<RT> for Deadline<Fut, RT> {
type Error = io::Error;
fn bind_to<M>(&mut self, ctx: &mut actor::Context<M, RT>) -> io::Result<()> {
let old_pid = self.rt.change_pid(ctx.runtime_ref().pid());
self.rt.change_deadline(old_pid, self.deadline);
Ok(())
}
}
impl<Fut, RT: rt::Access> Drop for Deadline<Fut, RT> {
fn drop(&mut self) {
self.rt.remove_deadline(self.deadline);
}
}
#[derive(Debug)]
#[must_use = "AsyncIterators do nothing unless polled"]
pub struct Interval<RT: rt::Access> {
deadline: Instant,
interval: Duration,
rt: RT,
}
impl<RT: rt::Access> Interval<RT> {
pub fn every<M>(ctx: &mut actor::Context<M, RT>, interval: Duration) -> Interval<RT>
where
RT: Clone,
{
let deadline = Instant::now() + interval;
let mut rt = ctx.runtime().clone();
rt.add_deadline(deadline);
Interval {
deadline,
interval,
rt,
}
}
pub const fn next_deadline(&self) -> Instant {
self.deadline
}
}
impl<RT: rt::Access> AsyncIterator for Interval<RT> {
type Item = DeadlinePassed;
fn poll_next(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
if self.deadline <= Instant::now() {
let next_deadline = Instant::now() + self.interval;
let this = Pin::get_mut(self);
this.deadline = next_deadline;
this.rt.add_deadline(next_deadline);
Poll::Ready(Some(DeadlinePassed))
} else {
Poll::Pending
}
}
}
impl<RT: rt::Access> Unpin for Interval<RT> {}
impl<RT: rt::Access> Bound<RT> for Interval<RT> {
type Error = !;
fn bind_to<M>(&mut self, ctx: &mut actor::Context<M, RT>) -> Result<(), !> {
let old_pid = self.rt.change_pid(ctx.runtime_ref().pid());
self.rt.change_deadline(old_pid, self.deadline);
Ok(())
}
}
impl<RT: rt::Access> Drop for Interval<RT> {
fn drop(&mut self) {
self.rt.remove_deadline(self.deadline);
}
}