use core::future::{Future, poll_fn};
use core::pin::Pin;
use core::task::{Context, Poll};
use futures_core::Stream;
use futures_core::stream::FusedStream;
use crate::{Duration, Instant};
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct TimeoutError;
pub fn with_timeout<F: Future>(timeout: Duration, fut: F) -> TimeoutFuture<F> {
TimeoutFuture {
timer: Timer::after(timeout),
fut,
}
}
pub fn with_deadline<F: Future>(at: Instant, fut: F) -> TimeoutFuture<F> {
TimeoutFuture {
timer: Timer::at(at),
fut,
}
}
pub trait WithTimeout: Sized {
type Output;
fn with_timeout(self, timeout: Duration) -> TimeoutFuture<Self>;
fn with_deadline(self, at: Instant) -> TimeoutFuture<Self>;
}
impl<F: Future> WithTimeout for F {
type Output = F::Output;
fn with_timeout(self, timeout: Duration) -> TimeoutFuture<Self> {
with_timeout(timeout, self)
}
fn with_deadline(self, at: Instant) -> TimeoutFuture<Self> {
with_deadline(at, self)
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct TimeoutFuture<F> {
timer: Timer,
fut: F,
}
impl<F: Unpin> Unpin for TimeoutFuture<F> {}
impl<F: Future> Future for TimeoutFuture<F> {
type Output = Result<F::Output, TimeoutError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
let fut = unsafe { Pin::new_unchecked(&mut this.fut) };
let timer = unsafe { Pin::new_unchecked(&mut this.timer) };
if let Poll::Ready(x) = fut.poll(cx) {
return Poll::Ready(Ok(x));
}
if let Poll::Ready(_) = timer.poll(cx) {
return Poll::Ready(Err(TimeoutError));
}
Poll::Pending
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct Timer {
expires_at: Instant,
yielded_once: bool,
}
impl Timer {
pub fn at(expires_at: Instant) -> Self {
Self {
expires_at,
yielded_once: false,
}
}
pub fn after(duration: Duration) -> Self {
Self {
expires_at: Instant::now() + duration,
yielded_once: false,
}
}
#[inline]
pub fn after_ticks(ticks: u64) -> Self {
Self::after(Duration::from_ticks(ticks))
}
#[inline]
pub fn after_nanos(nanos: u64) -> Self {
Self::after(Duration::from_nanos(nanos))
}
#[inline]
pub fn after_micros(micros: u64) -> Self {
Self::after(Duration::from_micros(micros))
}
#[inline]
pub fn after_millis(millis: u64) -> Self {
Self::after(Duration::from_millis(millis))
}
#[inline]
pub fn after_secs(secs: u64) -> Self {
Self::after(Duration::from_secs(secs))
}
}
impl Unpin for Timer {}
impl Future for Timer {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.yielded_once && self.expires_at <= Instant::now() {
Poll::Ready(())
} else {
embassy_time_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker());
self.yielded_once = true;
Poll::Pending
}
}
}
#[derive(Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct Ticker {
expires_at: Instant,
duration: Duration,
}
impl Ticker {
pub fn every(duration: Duration) -> Self {
let expires_at = Instant::now() + duration;
Self { expires_at, duration }
}
pub fn reset(&mut self) {
self.expires_at = Instant::now() + self.duration;
}
pub fn reset_at(&mut self, deadline: Instant) {
self.expires_at = deadline + self.duration;
}
pub fn reset_after(&mut self, after: Duration) {
self.expires_at = Instant::now() + after + self.duration;
}
pub fn next(&mut self) -> impl Future<Output = ()> + Send + Sync + '_ {
poll_fn(|cx| {
if self.expires_at <= Instant::now() {
let dur = self.duration;
self.expires_at += dur;
Poll::Ready(())
} else {
embassy_time_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker());
Poll::Pending
}
})
}
}
impl Unpin for Ticker {}
impl Stream for Ticker {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.expires_at <= Instant::now() {
let dur = self.duration;
self.expires_at += dur;
Poll::Ready(Some(()))
} else {
embassy_time_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker());
Poll::Pending
}
}
}
impl FusedStream for Ticker {
fn is_terminated(&self) -> bool {
false
}
}