safina_timer/
lib.rs

1//! # ARCHIVED ARCHIVED ARCHIVED
2//! This crate is archived and will not be updated.
3//!
4//! The code is now at
5//! [`safina::timer`](https://docs.rs/safina/latest/safina/timer/) in the
6//! [`safina`](https://crates.io/crates/safina) crate.
7//!
8//! ----
9//!
10//! # safina-timer
11//!
12//! Provides async [`sleep_for`](https://docs.rs/safina-timer/latest/safina_timer/fn.sleep_for.html)
13//! and [`sleep_until`](https://docs.rs/safina-timer/latest/safina_timer/fn.sleep_until.html)
14//! functions.
15//!
16//! This crate is part of [`safina`](https://crates.io/crates/safina),
17//! a safe async runtime.
18//!
19//! # Features
20//! - `forbid(unsafe_code)`
21//! - Depends only on `std`
22//! - 100% test coverage
23//! - Source of time is
24//!   [`std::thread::park_timeout`](https://doc.rust-lang.org/std/thread/fn.park_timeout.html)
25//!   via
26//!   [`std::sync::mpsc::Receiver::recv_timeout`](https://doc.rust-lang.org/std/sync/mpsc/struct.Receiver.html#method.recv_timeout).
27//! - Works with [`safina-executor`](https://crates.io/crates/safina-executor)
28//!   or any async executor
29//!
30//! # Limitations
31//! - Building on `stable` requires the feature `once_cell`.
32//!   This uses [`once_cell`](https://crates.io/crates/once_cell) crate
33//!   which contains some unsafe code.
34//!   This is necessary until
35//!   [`std::lazy::OnceCell`](https://doc.rust-lang.org/std/lazy/struct.OnceCell.html)
36//!   is stable.
37//! - Timers complete around 2ms late, but never early
38//! - Allocates memory
39//!
40//! # Examples
41//! ```rust
42//! # use core::time::Duration;
43//! # async fn f() {
44//! safina_timer::start_timer_thread();
45//! let duration = Duration::from_secs(10);
46//! safina_timer::sleep_for(duration).await;
47//! # }
48//! ```
49//!
50//! ```rust
51//! # use core::time::Duration;
52//! # use std::time::Instant;
53//! # async fn f() {
54//! safina_timer::start_timer_thread();
55//! let deadline =
56//!     Instant::now() + Duration::from_secs(1);
57//! safina_timer::sleep_until(deadline).await;
58//! # }
59//! ```
60//!
61//! ```rust
62//! # use core::time::Duration;
63//! # use std::time::Instant;
64//! # async fn read_request() -> Result<(), std::io::Error> { Ok(()) }
65//! # async fn read_data(id: ()) -> Result<(), std::io::Error> { Ok(()) }
66//! # fn process_data(data: ()) -> Result<(), std::io::Error> { Ok(()) }
67//! # async fn write_data(data: ()) -> Result<(), std::io::Error> { Ok(()) }
68//! # async fn send_response(response: ()) -> Result<(), std::io::Error> { Ok(()) }
69//! # async fn f() -> Result<(), std::io::Error> {
70//! safina_timer::start_timer_thread();
71//! let deadline =
72//!     Instant::now() + Duration::from_secs(1);
73//! let req = safina_timer::with_deadline(
74//!     read_request(), deadline).await??;
75//! let data = safina_timer::with_deadline(
76//!     read_data(req), deadline).await??;
77//! safina_timer::with_deadline(
78//!     write_data(data), deadline ).await??;
79//! safina_timer::with_deadline(
80//!     send_response(data), deadline).await??;
81//! # Ok(())
82//! # }
83//! ```
84//!
85//! ```rust
86//! # use core::time::Duration;
87//! # use std::time::Instant;
88//! # async fn read_request() -> Result<(), std::io::Error> { Ok(()) }
89//! # async fn read_data(id: ()) -> Result<(), std::io::Error> { Ok(()) }
90//! # fn process_data(data: ()) -> Result<(), std::io::Error> { Ok(()) }
91//! # async fn write_data(data: ()) -> Result<(), std::io::Error> { Ok(()) }
92//! # async fn send_response(response: ()) -> Result<(), std::io::Error> { Ok(()) }
93//! # async fn f() -> Result<(), std::io::Error> {
94//! safina_timer::start_timer_thread();
95//! let req = safina_timer::with_timeout(
96//!     read_request(), Duration::from_secs(1)
97//! ).await??;
98//! let data = safina_timer::with_timeout(
99//!     read_data(req), Duration::from_secs(2)
100//! ).await??;
101//! safina_timer::with_timeout(
102//!     write_data(data), Duration::from_secs(2)
103//! ).await??;
104//! safina_timer::with_timeout(
105//!     send_response(data),
106//!     Duration::from_secs(1)
107//! ).await??;
108//! # Ok(())
109//! # }
110//! ```
111//!
112//! # Documentation
113//! <https://docs.rs/safina-timer>
114//!
115//! # Alternatives
116//! - [futures-timer](https://crates.io/crates/futures-timer)
117//!   - popular
118//!   - Supports: Wasm, Linux, Windows, macOS
119//!   - Contains generous amounts of `unsafe` code
120//!   - Uses `std::thread::park_timeout` as its source of time
121//! - [async-io](https://crates.io/crates/async-io)
122//!   - popular
123//!   - single and repeating timers
124//!   - Supports: Linux, Windows, macOS, iOS, Android, and many others.
125//!   - Uses [polling](https://crates.io/crates/polling) crate
126//!     which makes unsafe calls to OS.
127//! - [async-timer](https://crates.io/crates/async-timer)
128//!   - Supports: Linux & Android
129//!   - Makes unsafe calls to OS
130//! - [tokio](https://crates.io/crates/tokio)
131//!   - very popular
132//!   - single and repeating timers
133//!   - Supports: Linux, macOS, other unix-like operating systems, Windows
134//!   - Fast, internally complicated, and full of `unsafe`
135//! - [embedded-async-timer](https://crates.io/crates/embedded-async-timer)
136//!   - `no_std`
137//!   - Supports `bare_metal`
138//!
139//! # Changelog
140//! - v0.1.11 - Remove some type constraints.
141//! - v0.1.10 - Use `safina-executor` v0.2.0.
142//! - v0.1.9 - Name the timer thread.
143//! - v0.1.8 - Increase test coverage
144//! - v0.1.7 - Support stable with rust 1.51 and `once_cell`.
145//! - v0.1.6 - Update dependencies
146//! - v0.1.5 - Update docs
147//! - v0.1.4 - Upgrade to new safina-executor version which removes need for `Box::pin`.
148//! - v0.1.3 - Add badges to readme
149//! - v0.1.2
150//!   - Update [`with_deadline`](https://docs.rs/safina-timer/latest/safina_timer/fn.with_deadline.html)
151//!     and [`with_timeout`](https://docs.rs/safina-timer/latest/safina_timer/fn.with_timeout.html):
152//!     - Make them panic on `TimerThreadNotStarted` error and
153//!       return new [`DeadlineExceeded`](https://docs.rs/safina-timer/latest/safina_timer/struct.DeadlineExceeded.html)
154//!       struct instead of `DeadlineError` enum.
155//!       This allows callers to write a match clause like `Err(DeadlineExceeded)`.
156//!     - Make them use
157//!       [`std::boxed::Box::pin`](https://doc.rust-lang.org/stable/std/boxed/struct.Box.html#method.pin)
158//!       so callers don't have to.
159//!   - Make [`sleep_until`](https://docs.rs/safina-timer/latest/safina_timer/fn.sleep_until.html)
160//!     and [`sleep_for`](https://docs.rs/safina-timer/latest/safina_timer/fn.sleep_for.html)
161//!     return `()` and
162//!     panic if [`start_timer_thread()`](fn.start_timer_thread.html) has not been called.
163//! - v0.1.1
164//!   - Use most recent waker passed to `SleepFuture::poll`, as required by the
165//!     [`std::future::Future::poll`](https://doc.rust-lang.org/stable/std/future/trait.Future.html#tymethod.poll)
166//!     contract.
167//!   - Add [`with_deadline`](https://docs.rs/safina-timer/latest/safina_timer/fn.with_deadline.html)
168//!     and [`with_timeout`](https://docs.rs/safina-timer/latest/safina_timer/fn.with_timeout.html)
169//!     functions.
170//! - v0.1.0 - First published version
171//!
172//! # TO DO
173//! - Add a way to schedule jobs (`FnOnce` structs).
174//!
175//! # Release Process
176//! 1. Edit `Cargo.toml` and bump version number.
177//! 1. Run `./release.sh`
178#![forbid(unsafe_code)]
179#![cfg_attr(not(feature = "once_cell"), feature(once_cell))]
180
181mod deadline_future;
182pub use deadline_future::*;
183
184mod sleep_future;
185pub use sleep_future::*;
186
187#[cfg(test)]
188mod deadline_future_tests;
189#[cfg(test)]
190mod lib_tests;
191#[cfg(test)]
192mod sleep_future_tests;
193
194use core::cmp::Reverse;
195use core::fmt::{Debug, Display, Formatter};
196use core::task::Waker;
197use std::collections::BinaryHeap;
198use std::error::Error;
199use std::sync::mpsc::{Receiver, RecvTimeoutError, SyncSender};
200use std::sync::{Arc, Mutex};
201use std::time::Instant;
202
203#[derive(Debug)]
204pub(crate) struct ScheduledWake {
205    instant: Instant,
206    waker: Arc<Mutex<Option<Waker>>>,
207}
208impl ScheduledWake {
209    pub fn wake(&self) {
210        if let Some(waker) = self.waker.lock().unwrap().take() {
211            waker.wake();
212        }
213    }
214}
215impl PartialEq for ScheduledWake {
216    fn eq(&self, other: &Self) -> bool {
217        std::cmp::PartialEq::eq(&self.instant, &other.instant)
218    }
219}
220impl Eq for ScheduledWake {}
221impl PartialOrd for ScheduledWake {
222    fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
223        core::cmp::PartialOrd::partial_cmp(&self.instant, &other.instant)
224    }
225}
226impl Ord for ScheduledWake {
227    fn cmp(&self, other: &Self) -> core::cmp::Ordering {
228        std::cmp::Ord::cmp(&self.instant, &other.instant)
229    }
230}
231
232#[cfg(not(feature = "once_cell"))]
233static TIMER_THREAD_SENDER: std::lazy::SyncOnceCell<SyncSender<ScheduledWake>> =
234    std::lazy::SyncOnceCell::new();
235#[cfg(feature = "once_cell")]
236static TIMER_THREAD_SENDER: once_cell::sync::OnceCell<SyncSender<ScheduledWake>> =
237    once_cell::sync::OnceCell::new();
238
239/// Starts the worker thread, if it's not already started.
240/// You must call this before calling [`sleep_until`] or [`sleep_for`].
241///
242/// The thread is called `"safina_timer"`.
243#[allow(clippy::missing_panics_doc)]
244pub fn start_timer_thread() {
245    TIMER_THREAD_SENDER.get_or_init(|| {
246        let (sender, receiver) = std::sync::mpsc::sync_channel(0);
247        std::thread::Builder::new()
248            .name("safina_timer".to_string())
249            .spawn(|| timer_thread(receiver))
250            .unwrap();
251        sender
252    });
253}
254
255#[allow(clippy::needless_pass_by_value)]
256fn timer_thread(receiver: Receiver<ScheduledWake>) {
257    let mut heap: BinaryHeap<Reverse<ScheduledWake>> = BinaryHeap::new();
258    loop {
259        if let Some(Reverse(peeked_wake)) = heap.peek() {
260            let now = Instant::now();
261            if peeked_wake.instant < now {
262                heap.pop().unwrap().0.wake();
263            } else {
264                // We can switch to recv_deadline once it is stable:
265                // https://doc.rust-lang.org/std/sync/mpsc/struct.Receiver.html#method.recv_deadline
266                match receiver.recv_timeout(peeked_wake.instant.saturating_duration_since(now)) {
267                    Ok(new_wake) => {
268                        heap.push(Reverse(new_wake));
269                    }
270                    Err(RecvTimeoutError::Timeout) => {}
271                    Err(RecvTimeoutError::Disconnected) => unreachable!(),
272                }
273            }
274        } else {
275            heap.push(Reverse(receiver.recv().unwrap()));
276        }
277    }
278}
279
280/// Call [`start_timer_thread`] to prevent this error.
281#[derive(Debug, Eq, PartialEq)]
282pub struct TimerThreadNotStarted {}
283impl Display for TimerThreadNotStarted {
284    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
285        std::fmt::Debug::fmt(self, f)
286    }
287}
288impl Error for TimerThreadNotStarted {}
289
290fn schedule_wake(
291    instant: Instant,
292    waker: Arc<Mutex<Option<Waker>>>,
293) -> Result<(), TimerThreadNotStarted> {
294    let sender = TIMER_THREAD_SENDER.get().ok_or(TimerThreadNotStarted {})?;
295    sender.send(ScheduledWake { instant, waker }).unwrap();
296    Ok(())
297}