safina-timer 0.1.11

Safe async timers
Documentation
//! [![crates.io version](https://img.shields.io/crates/v/safina-timer.svg)](https://crates.io/crates/safina-timer)
//! [![license: Apache 2.0](https://gitlab.com/leonhard-llc/safina-rs/-/raw/main/license-apache-2.0.svg)](http://www.apache.org/licenses/LICENSE-2.0)
//! [![unsafe forbidden](https://gitlab.com/leonhard-llc/safina-rs/-/raw/main/unsafe-forbidden-success.svg)](https://github.com/rust-secure-code/safety-dance/)
//! [![pipeline status](https://gitlab.com/leonhard-llc/safina-rs/badges/main/pipeline.svg)](https://gitlab.com/leonhard-llc/safina-rs/-/pipelines)
//!
//! Provides async [`sleep_for`](https://docs.rs/safina-timer/latest/safina_timer/fn.sleep_for.html)
//! and [`sleep_until`](https://docs.rs/safina-timer/latest/safina_timer/fn.sleep_until.html)
//! functions.
//!
//! This crate is part of [`safina`](https://crates.io/crates/safina),
//! a safe async runtime.
//!
//! # Features
//! - `forbid(unsafe_code)`
//! - Depends only on `std`
//! - 100% test coverage
//! - Source of time is
//!   [`std::thread::park_timeout`](https://doc.rust-lang.org/std/thread/fn.park_timeout.html)
//!   via
//!   [`std::sync::mpsc::Receiver::recv_timeout`](https://doc.rust-lang.org/std/sync/mpsc/struct.Receiver.html#method.recv_timeout).
//! - Works with [`safina-executor`](https://crates.io/crates/safina-executor)
//!   or any async executor
//!
//! # Limitations
//! - Building on `stable` requires the feature `once_cell`.
//!   This uses [`once_cell`](https://crates.io/crates/once_cell) crate
//!   which contains some unsafe code.
//!   This is necessary until
//!   [`std::lazy::OnceCell`](https://doc.rust-lang.org/std/lazy/struct.OnceCell.html)
//!   is stable.
//! - Timers complete around 2ms late, but never early
//! - Allocates memory
//!
//! # Examples
//! ```rust
//! # use core::time::Duration;
//! # async fn f() {
//! safina_timer::start_timer_thread();
//! let duration = Duration::from_secs(10);
//! safina_timer::sleep_for(duration).await;
//! # }
//! ```
//!
//! ```rust
//! # use core::time::Duration;
//! # use std::time::Instant;
//! # async fn f() {
//! safina_timer::start_timer_thread();
//! let deadline =
//!     Instant::now() + Duration::from_secs(1);
//! safina_timer::sleep_until(deadline).await;
//! # }
//! ```
//!
//! ```rust
//! # use core::time::Duration;
//! # use std::time::Instant;
//! # async fn read_request() -> Result<(), std::io::Error> { Ok(()) }
//! # async fn read_data(id: ()) -> Result<(), std::io::Error> { Ok(()) }
//! # fn process_data(data: ()) -> Result<(), std::io::Error> { Ok(()) }
//! # async fn write_data(data: ()) -> Result<(), std::io::Error> { Ok(()) }
//! # async fn send_response(response: ()) -> Result<(), std::io::Error> { Ok(()) }
//! # async fn f() -> Result<(), std::io::Error> {
//! safina_timer::start_timer_thread();
//! let deadline =
//!     Instant::now() + Duration::from_secs(1);
//! let req = safina_timer::with_deadline(
//!     read_request(), deadline).await??;
//! let data = safina_timer::with_deadline(
//!     read_data(req), deadline).await??;
//! safina_timer::with_deadline(
//!     write_data(data), deadline ).await??;
//! safina_timer::with_deadline(
//!     send_response(data), deadline).await??;
//! # Ok(())
//! # }
//! ```
//!
//! ```rust
//! # use core::time::Duration;
//! # use std::time::Instant;
//! # async fn read_request() -> Result<(), std::io::Error> { Ok(()) }
//! # async fn read_data(id: ()) -> Result<(), std::io::Error> { Ok(()) }
//! # fn process_data(data: ()) -> Result<(), std::io::Error> { Ok(()) }
//! # async fn write_data(data: ()) -> Result<(), std::io::Error> { Ok(()) }
//! # async fn send_response(response: ()) -> Result<(), std::io::Error> { Ok(()) }
//! # async fn f() -> Result<(), std::io::Error> {
//! safina_timer::start_timer_thread();
//! let req = safina_timer::with_timeout(
//!     read_request(), Duration::from_secs(1)
//! ).await??;
//! let data = safina_timer::with_timeout(
//!     read_data(req), Duration::from_secs(2)
//! ).await??;
//! safina_timer::with_timeout(
//!     write_data(data), Duration::from_secs(2)
//! ).await??;
//! safina_timer::with_timeout(
//!     send_response(data),
//!     Duration::from_secs(1)
//! ).await??;
//! # Ok(())
//! # }
//! ```
//!
//! # Documentation
//! <https://docs.rs/safina-timer>
//!
//! # Alternatives
//! - [futures-timer](https://crates.io/crates/futures-timer)
//!   - popular
//!   - Supports: Wasm, Linux, Windows, macOS
//!   - Contains generous amounts of `unsafe` code
//!   - Uses `std::thread::park_timeout` as its source of time
//! - [async-io](https://crates.io/crates/async-io)
//!   - popular
//!   - single and repeating timers
//!   - Supports: Linux, Windows, macOS, iOS, Android, and many others.
//!   - Uses [polling](https://crates.io/crates/polling) crate
//!     which makes unsafe calls to OS.
//! - [async-timer](https://crates.io/crates/async-timer)
//!   - Supports: Linux & Android
//!   - Makes unsafe calls to OS
//! - [tokio](https://crates.io/crates/tokio)
//!   - very popular
//!   - single and repeating timers
//!   - Supports: Linux, macOS, other unix-like operating systems, Windows
//!   - Fast, internally complicated, and full of `unsafe`
//! - [embedded-async-timer](https://crates.io/crates/embedded-async-timer)
//!   - `no_std`
//!   - Supports `bare_metal`
//!
//! # Changelog
//! - v0.1.11 - Remove some type constraints.
//! - v0.1.10 - Use `safina-executor` v0.2.0.
//! - v0.1.9 - Name the timer thread.
//! - v0.1.8 - Increase test coverage
//! - v0.1.7 - Support stable with rust 1.51 and `once_cell`.
//! - v0.1.6 - Update dependencies
//! - v0.1.5 - Update docs
//! - v0.1.4 - Upgrade to new safina-executor version which removes need for `Box::pin`.
//! - v0.1.3 - Add badges to readme
//! - v0.1.2
//!   - Update [`with_deadline`](https://docs.rs/safina-timer/latest/safina_timer/fn.with_deadline.html)
//!     and [`with_timeout`](https://docs.rs/safina-timer/latest/safina_timer/fn.with_timeout.html):
//!     - Make them panic on `TimerThreadNotStarted` error and
//!       return new [`DeadlineExceeded`](https://docs.rs/safina-timer/latest/safina_timer/struct.DeadlineExceeded.html)
//!       struct instead of `DeadlineError` enum.
//!       This allows callers to write a match clause like `Err(DeadlineExceeded)`.
//!     - Make them use
//!       [`std::boxed::Box::pin`](https://doc.rust-lang.org/stable/std/boxed/struct.Box.html#method.pin)
//!       so callers don't have to.
//!   - Make [`sleep_until`](https://docs.rs/safina-timer/latest/safina_timer/fn.sleep_until.html)
//!     and [`sleep_for`](https://docs.rs/safina-timer/latest/safina_timer/fn.sleep_for.html)
//!     return `()` and
//!     panic if [`start_timer_thread()`](fn.start_timer_thread.html) has not been called.
//! - v0.1.1
//!   - Use most recent waker passed to `SleepFuture::poll`, as required by the
//!     [`std::future::Future::poll`](https://doc.rust-lang.org/stable/std/future/trait.Future.html#tymethod.poll)
//!     contract.
//!   - Add [`with_deadline`](https://docs.rs/safina-timer/latest/safina_timer/fn.with_deadline.html)
//!     and [`with_timeout`](https://docs.rs/safina-timer/latest/safina_timer/fn.with_timeout.html)
//!     functions.
//! - v0.1.0 - First published version
//!
//! # TO DO
//! - Add a way to schedule jobs (`FnOnce` structs).
//!
//! # Release Process
//! 1. Edit `Cargo.toml` and bump version number.
//! 1. Run `./release.sh`
#![forbid(unsafe_code)]
#![cfg_attr(not(feature = "once_cell"), feature(once_cell))]

mod deadline_future;
pub use deadline_future::*;

mod sleep_future;
pub use sleep_future::*;

#[cfg(test)]
mod deadline_future_tests;
#[cfg(test)]
mod lib_tests;
#[cfg(test)]
mod sleep_future_tests;

use core::cmp::Reverse;
use core::fmt::{Debug, Display, Formatter};
use core::task::Waker;
use std::collections::BinaryHeap;
use std::error::Error;
use std::sync::mpsc::{Receiver, RecvTimeoutError, SyncSender};
use std::sync::{Arc, Mutex};
use std::time::Instant;

#[derive(Debug)]
pub(crate) struct ScheduledWake {
    instant: Instant,
    waker: Arc<Mutex<Option<Waker>>>,
}
impl ScheduledWake {
    pub fn wake(&self) {
        if let Some(waker) = self.waker.lock().unwrap().take() {
            waker.wake();
        }
    }
}
impl PartialEq for ScheduledWake {
    fn eq(&self, other: &Self) -> bool {
        std::cmp::PartialEq::eq(&self.instant, &other.instant)
    }
}
impl Eq for ScheduledWake {}
impl PartialOrd for ScheduledWake {
    fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
        core::cmp::PartialOrd::partial_cmp(&self.instant, &other.instant)
    }
}
impl Ord for ScheduledWake {
    fn cmp(&self, other: &Self) -> core::cmp::Ordering {
        std::cmp::Ord::cmp(&self.instant, &other.instant)
    }
}

#[cfg(not(feature = "once_cell"))]
static TIMER_THREAD_SENDER: std::lazy::SyncOnceCell<SyncSender<ScheduledWake>> =
    std::lazy::SyncOnceCell::new();
#[cfg(feature = "once_cell")]
static TIMER_THREAD_SENDER: once_cell::sync::OnceCell<SyncSender<ScheduledWake>> =
    once_cell::sync::OnceCell::new();

/// Starts the worker thread, if it's not already started.
/// You must call this before calling [`sleep_until`] or [`sleep_for`].
///
/// The thread is called `"safina_timer"`.
#[allow(clippy::missing_panics_doc)]
pub fn start_timer_thread() {
    TIMER_THREAD_SENDER.get_or_init(|| {
        let (sender, receiver) = std::sync::mpsc::sync_channel(0);
        std::thread::Builder::new()
            .name("safina_timer".to_string())
            .spawn(|| timer_thread(receiver))
            .unwrap();
        sender
    });
}

#[allow(clippy::needless_pass_by_value)]
fn timer_thread(receiver: Receiver<ScheduledWake>) {
    let mut heap: BinaryHeap<Reverse<ScheduledWake>> = BinaryHeap::new();
    loop {
        if let Some(Reverse(peeked_wake)) = heap.peek() {
            let now = Instant::now();
            if peeked_wake.instant < now {
                heap.pop().unwrap().0.wake();
            } else {
                // We can switch to recv_deadline once it is stable:
                // https://doc.rust-lang.org/std/sync/mpsc/struct.Receiver.html#method.recv_deadline
                match receiver.recv_timeout(peeked_wake.instant.saturating_duration_since(now)) {
                    Ok(new_wake) => {
                        heap.push(Reverse(new_wake));
                    }
                    Err(RecvTimeoutError::Timeout) => {}
                    Err(RecvTimeoutError::Disconnected) => unreachable!(),
                }
            }
        } else {
            heap.push(Reverse(receiver.recv().unwrap()));
        }
    }
}

/// Call [`start_timer_thread`] to prevent this error.
#[derive(Debug, Eq, PartialEq)]
pub struct TimerThreadNotStarted {}
impl Display for TimerThreadNotStarted {
    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
        std::fmt::Debug::fmt(self, f)
    }
}
impl Error for TimerThreadNotStarted {}

fn schedule_wake(
    instant: Instant,
    waker: Arc<Mutex<Option<Waker>>>,
) -> Result<(), TimerThreadNotStarted> {
    let sender = TIMER_THREAD_SENDER.get().ok_or(TimerThreadNotStarted {})?;
    sender.send(ScheduledWake { instant, waker }).unwrap();
    Ok(())
}