safina_timer/lib.rs
1//! # ARCHIVED ARCHIVED ARCHIVED
2//! This crate is archived and will not be updated.
3//!
4//! The code is now at
5//! [`safina::timer`](https://docs.rs/safina/latest/safina/timer/) in the
6//! [`safina`](https://crates.io/crates/safina) crate.
7//!
8//! ----
9//!
10//! # safina-timer
11//!
12//! Provides async [`sleep_for`](https://docs.rs/safina-timer/latest/safina_timer/fn.sleep_for.html)
13//! and [`sleep_until`](https://docs.rs/safina-timer/latest/safina_timer/fn.sleep_until.html)
14//! functions.
15//!
16//! This crate is part of [`safina`](https://crates.io/crates/safina),
17//! a safe async runtime.
18//!
19//! # Features
20//! - `forbid(unsafe_code)`
21//! - Depends only on `std`
22//! - 100% test coverage
23//! - Source of time is
24//! [`std::thread::park_timeout`](https://doc.rust-lang.org/std/thread/fn.park_timeout.html)
25//! via
26//! [`std::sync::mpsc::Receiver::recv_timeout`](https://doc.rust-lang.org/std/sync/mpsc/struct.Receiver.html#method.recv_timeout).
27//! - Works with [`safina-executor`](https://crates.io/crates/safina-executor)
28//! or any async executor
29//!
30//! # Limitations
31//! - Building on `stable` requires the feature `once_cell`.
32//! This uses [`once_cell`](https://crates.io/crates/once_cell) crate
33//! which contains some unsafe code.
34//! This is necessary until
35//! [`std::lazy::OnceCell`](https://doc.rust-lang.org/std/lazy/struct.OnceCell.html)
36//! is stable.
37//! - Timers complete around 2ms late, but never early
38//! - Allocates memory
39//!
40//! # Examples
41//! ```rust
42//! # use core::time::Duration;
43//! # async fn f() {
44//! safina_timer::start_timer_thread();
45//! let duration = Duration::from_secs(10);
46//! safina_timer::sleep_for(duration).await;
47//! # }
48//! ```
49//!
50//! ```rust
51//! # use core::time::Duration;
52//! # use std::time::Instant;
53//! # async fn f() {
54//! safina_timer::start_timer_thread();
55//! let deadline =
56//! Instant::now() + Duration::from_secs(1);
57//! safina_timer::sleep_until(deadline).await;
58//! # }
59//! ```
60//!
61//! ```rust
62//! # use core::time::Duration;
63//! # use std::time::Instant;
64//! # async fn read_request() -> Result<(), std::io::Error> { Ok(()) }
65//! # async fn read_data(id: ()) -> Result<(), std::io::Error> { Ok(()) }
66//! # fn process_data(data: ()) -> Result<(), std::io::Error> { Ok(()) }
67//! # async fn write_data(data: ()) -> Result<(), std::io::Error> { Ok(()) }
68//! # async fn send_response(response: ()) -> Result<(), std::io::Error> { Ok(()) }
69//! # async fn f() -> Result<(), std::io::Error> {
70//! safina_timer::start_timer_thread();
71//! let deadline =
72//! Instant::now() + Duration::from_secs(1);
73//! let req = safina_timer::with_deadline(
74//! read_request(), deadline).await??;
75//! let data = safina_timer::with_deadline(
76//! read_data(req), deadline).await??;
77//! safina_timer::with_deadline(
78//! write_data(data), deadline ).await??;
79//! safina_timer::with_deadline(
80//! send_response(data), deadline).await??;
81//! # Ok(())
82//! # }
83//! ```
84//!
85//! ```rust
86//! # use core::time::Duration;
87//! # use std::time::Instant;
88//! # async fn read_request() -> Result<(), std::io::Error> { Ok(()) }
89//! # async fn read_data(id: ()) -> Result<(), std::io::Error> { Ok(()) }
90//! # fn process_data(data: ()) -> Result<(), std::io::Error> { Ok(()) }
91//! # async fn write_data(data: ()) -> Result<(), std::io::Error> { Ok(()) }
92//! # async fn send_response(response: ()) -> Result<(), std::io::Error> { Ok(()) }
93//! # async fn f() -> Result<(), std::io::Error> {
94//! safina_timer::start_timer_thread();
95//! let req = safina_timer::with_timeout(
96//! read_request(), Duration::from_secs(1)
97//! ).await??;
98//! let data = safina_timer::with_timeout(
99//! read_data(req), Duration::from_secs(2)
100//! ).await??;
101//! safina_timer::with_timeout(
102//! write_data(data), Duration::from_secs(2)
103//! ).await??;
104//! safina_timer::with_timeout(
105//! send_response(data),
106//! Duration::from_secs(1)
107//! ).await??;
108//! # Ok(())
109//! # }
110//! ```
111//!
112//! # Documentation
113//! <https://docs.rs/safina-timer>
114//!
115//! # Alternatives
116//! - [futures-timer](https://crates.io/crates/futures-timer)
117//! - popular
118//! - Supports: Wasm, Linux, Windows, macOS
119//! - Contains generous amounts of `unsafe` code
120//! - Uses `std::thread::park_timeout` as its source of time
121//! - [async-io](https://crates.io/crates/async-io)
122//! - popular
123//! - single and repeating timers
124//! - Supports: Linux, Windows, macOS, iOS, Android, and many others.
125//! - Uses [polling](https://crates.io/crates/polling) crate
126//! which makes unsafe calls to OS.
127//! - [async-timer](https://crates.io/crates/async-timer)
128//! - Supports: Linux & Android
129//! - Makes unsafe calls to OS
130//! - [tokio](https://crates.io/crates/tokio)
131//! - very popular
132//! - single and repeating timers
133//! - Supports: Linux, macOS, other unix-like operating systems, Windows
134//! - Fast, internally complicated, and full of `unsafe`
135//! - [embedded-async-timer](https://crates.io/crates/embedded-async-timer)
136//! - `no_std`
137//! - Supports `bare_metal`
138//!
139//! # Changelog
140//! - v0.1.11 - Remove some type constraints.
141//! - v0.1.10 - Use `safina-executor` v0.2.0.
142//! - v0.1.9 - Name the timer thread.
143//! - v0.1.8 - Increase test coverage
144//! - v0.1.7 - Support stable with rust 1.51 and `once_cell`.
145//! - v0.1.6 - Update dependencies
146//! - v0.1.5 - Update docs
147//! - v0.1.4 - Upgrade to new safina-executor version which removes need for `Box::pin`.
148//! - v0.1.3 - Add badges to readme
149//! - v0.1.2
150//! - Update [`with_deadline`](https://docs.rs/safina-timer/latest/safina_timer/fn.with_deadline.html)
151//! and [`with_timeout`](https://docs.rs/safina-timer/latest/safina_timer/fn.with_timeout.html):
152//! - Make them panic on `TimerThreadNotStarted` error and
153//! return new [`DeadlineExceeded`](https://docs.rs/safina-timer/latest/safina_timer/struct.DeadlineExceeded.html)
154//! struct instead of `DeadlineError` enum.
155//! This allows callers to write a match clause like `Err(DeadlineExceeded)`.
156//! - Make them use
157//! [`std::boxed::Box::pin`](https://doc.rust-lang.org/stable/std/boxed/struct.Box.html#method.pin)
158//! so callers don't have to.
159//! - Make [`sleep_until`](https://docs.rs/safina-timer/latest/safina_timer/fn.sleep_until.html)
160//! and [`sleep_for`](https://docs.rs/safina-timer/latest/safina_timer/fn.sleep_for.html)
161//! return `()` and
162//! panic if [`start_timer_thread()`](fn.start_timer_thread.html) has not been called.
163//! - v0.1.1
164//! - Use most recent waker passed to `SleepFuture::poll`, as required by the
165//! [`std::future::Future::poll`](https://doc.rust-lang.org/stable/std/future/trait.Future.html#tymethod.poll)
166//! contract.
167//! - Add [`with_deadline`](https://docs.rs/safina-timer/latest/safina_timer/fn.with_deadline.html)
168//! and [`with_timeout`](https://docs.rs/safina-timer/latest/safina_timer/fn.with_timeout.html)
169//! functions.
170//! - v0.1.0 - First published version
171//!
172//! # TO DO
173//! - Add a way to schedule jobs (`FnOnce` structs).
174//!
175//! # Release Process
176//! 1. Edit `Cargo.toml` and bump version number.
177//! 1. Run `./release.sh`
178#![forbid(unsafe_code)]
179#![cfg_attr(not(feature = "once_cell"), feature(once_cell))]
180
181mod deadline_future;
182pub use deadline_future::*;
183
184mod sleep_future;
185pub use sleep_future::*;
186
187#[cfg(test)]
188mod deadline_future_tests;
189#[cfg(test)]
190mod lib_tests;
191#[cfg(test)]
192mod sleep_future_tests;
193
194use core::cmp::Reverse;
195use core::fmt::{Debug, Display, Formatter};
196use core::task::Waker;
197use std::collections::BinaryHeap;
198use std::error::Error;
199use std::sync::mpsc::{Receiver, RecvTimeoutError, SyncSender};
200use std::sync::{Arc, Mutex};
201use std::time::Instant;
202
203#[derive(Debug)]
204pub(crate) struct ScheduledWake {
205 instant: Instant,
206 waker: Arc<Mutex<Option<Waker>>>,
207}
208impl ScheduledWake {
209 pub fn wake(&self) {
210 if let Some(waker) = self.waker.lock().unwrap().take() {
211 waker.wake();
212 }
213 }
214}
215impl PartialEq for ScheduledWake {
216 fn eq(&self, other: &Self) -> bool {
217 std::cmp::PartialEq::eq(&self.instant, &other.instant)
218 }
219}
220impl Eq for ScheduledWake {}
221impl PartialOrd for ScheduledWake {
222 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
223 core::cmp::PartialOrd::partial_cmp(&self.instant, &other.instant)
224 }
225}
226impl Ord for ScheduledWake {
227 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
228 std::cmp::Ord::cmp(&self.instant, &other.instant)
229 }
230}
231
232#[cfg(not(feature = "once_cell"))]
233static TIMER_THREAD_SENDER: std::lazy::SyncOnceCell<SyncSender<ScheduledWake>> =
234 std::lazy::SyncOnceCell::new();
235#[cfg(feature = "once_cell")]
236static TIMER_THREAD_SENDER: once_cell::sync::OnceCell<SyncSender<ScheduledWake>> =
237 once_cell::sync::OnceCell::new();
238
239/// Starts the worker thread, if it's not already started.
240/// You must call this before calling [`sleep_until`] or [`sleep_for`].
241///
242/// The thread is called `"safina_timer"`.
243#[allow(clippy::missing_panics_doc)]
244pub fn start_timer_thread() {
245 TIMER_THREAD_SENDER.get_or_init(|| {
246 let (sender, receiver) = std::sync::mpsc::sync_channel(0);
247 std::thread::Builder::new()
248 .name("safina_timer".to_string())
249 .spawn(|| timer_thread(receiver))
250 .unwrap();
251 sender
252 });
253}
254
255#[allow(clippy::needless_pass_by_value)]
256fn timer_thread(receiver: Receiver<ScheduledWake>) {
257 let mut heap: BinaryHeap<Reverse<ScheduledWake>> = BinaryHeap::new();
258 loop {
259 if let Some(Reverse(peeked_wake)) = heap.peek() {
260 let now = Instant::now();
261 if peeked_wake.instant < now {
262 heap.pop().unwrap().0.wake();
263 } else {
264 // We can switch to recv_deadline once it is stable:
265 // https://doc.rust-lang.org/std/sync/mpsc/struct.Receiver.html#method.recv_deadline
266 match receiver.recv_timeout(peeked_wake.instant.saturating_duration_since(now)) {
267 Ok(new_wake) => {
268 heap.push(Reverse(new_wake));
269 }
270 Err(RecvTimeoutError::Timeout) => {}
271 Err(RecvTimeoutError::Disconnected) => unreachable!(),
272 }
273 }
274 } else {
275 heap.push(Reverse(receiver.recv().unwrap()));
276 }
277 }
278}
279
280/// Call [`start_timer_thread`] to prevent this error.
281#[derive(Debug, Eq, PartialEq)]
282pub struct TimerThreadNotStarted {}
283impl Display for TimerThreadNotStarted {
284 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
285 std::fmt::Debug::fmt(self, f)
286 }
287}
288impl Error for TimerThreadNotStarted {}
289
290fn schedule_wake(
291 instant: Instant,
292 waker: Arc<Mutex<Option<Waker>>>,
293) -> Result<(), TimerThreadNotStarted> {
294 let sender = TIMER_THREAD_SENDER.get().ok_or(TimerThreadNotStarted {})?;
295 sender.send(ScheduledWake { instant, waker }).unwrap();
296 Ok(())
297}