pingora_timeout/
lib.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![warn(clippy::all)]
16
17//! A drop-in replacement of [tokio::time::timeout] which is much more efficient.
18//!
19//! Similar to [tokio::time::timeout] but more efficient on busy concurrent IOs where timeouts are
20//! created and canceled very frequently.
21//!
22//! This crate provides the following optimizations
23//! - The timeouts lazily initializes their timer when the Future is pending for the first time.
24//! - There is no global lock for creating and cancelling timeouts.
25//! - Timeout timers are rounded to the next 10ms tick and timers are shared across all timeouts with the same deadline.
26//!
27//! Benchmark:
28//!
29//! 438.302µs total, 4ns avg per iteration
30//!
31//! v.s. Tokio timeout():
32//!
33//! 10.716192ms total, 107ns avg per iteration
34//!
35
36pub mod fast_timeout;
37pub mod timer;
38
39pub use fast_timeout::fast_sleep as sleep;
40pub use fast_timeout::fast_timeout as timeout;
41
42use pin_project_lite::pin_project;
43use std::future::Future;
44use std::pin::Pin;
45use std::task::{self, Poll};
46use tokio::time::{sleep as tokio_sleep, Duration};
47
48/// The interface to start a timeout
49///
50/// Users don't need to interact with this trait
51pub trait ToTimeout {
52    fn timeout(&self) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
53    fn create(d: Duration) -> Self;
54}
55
56/// The timeout generated by [tokio_timeout()].
57///
58/// Users don't need to interact with this object.
59pub struct TokioTimeout(Duration);
60
61impl ToTimeout for TokioTimeout {
62    fn timeout(&self) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
63        Box::pin(tokio_sleep(self.0))
64    }
65
66    fn create(d: Duration) -> Self {
67        TokioTimeout(d)
68    }
69}
70
71/// The error type returned when the timeout is reached.
72#[derive(Debug)]
73pub struct Elapsed;
74
75impl std::fmt::Display for Elapsed {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        write!(f, "Timeout Elapsed")
78    }
79}
80
81impl std::error::Error for Elapsed {}
82
83/// The [tokio::time::timeout] with just lazy timer initialization.
84///
85/// The timer is created the first time the `future` is pending. This avoids unnecessary timer
86/// creation and cancellation on busy IOs with a good chance to be already ready (e.g., reading
87/// data from TCP where the recv buffer already has a lot of data to read right away).
88pub fn tokio_timeout<T>(duration: Duration, future: T) -> Timeout<T, TokioTimeout>
89where
90    T: Future,
91{
92    Timeout::<T, TokioTimeout>::new_with_delay(future, duration)
93}
94
95pin_project! {
96    /// The timeout future returned by the timeout functions
97    #[must_use = "futures do nothing unless you `.await` or poll them"]
98    pub struct Timeout<T, F> {
99        #[pin]
100        value: T,
101        #[pin]
102        delay: Option<Pin<Box<dyn Future<Output = ()> + Send + Sync>>>,
103        callback: F, // callback to create the timer
104    }
105}
106
107impl<T, F> Timeout<T, F>
108where
109    F: ToTimeout,
110{
111    pub(crate) fn new_with_delay(value: T, d: Duration) -> Timeout<T, F> {
112        Timeout {
113            value,
114            delay: None,
115            callback: F::create(d),
116        }
117    }
118}
119
120impl<T, F> Future for Timeout<T, F>
121where
122    T: Future,
123    F: ToTimeout,
124{
125    type Output = Result<T::Output, Elapsed>;
126
127    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
128        let mut me = self.project();
129
130        // First, try polling the future
131        if let Poll::Ready(v) = me.value.poll(cx) {
132            return Poll::Ready(Ok(v));
133        }
134
135        let delay = me
136            .delay
137            .get_or_insert_with(|| Box::pin(me.callback.timeout()));
138
139        match delay.as_mut().poll(cx) {
140            Poll::Pending => Poll::Pending,
141            Poll::Ready(()) => Poll::Ready(Err(Elapsed {})),
142        }
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149
150    #[tokio::test]
151    async fn test_timeout() {
152        let fut = tokio_sleep(Duration::from_secs(1000));
153        let to = timeout(Duration::from_secs(1), fut);
154        assert!(to.await.is_err())
155    }
156
157    #[tokio::test]
158    async fn test_instantly_return() {
159        let fut = async { 1 };
160        let to = timeout(Duration::from_secs(1), fut);
161        assert_eq!(to.await.unwrap(), 1)
162    }
163
164    #[tokio::test]
165    async fn test_delayed_return() {
166        let fut = async {
167            tokio_sleep(Duration::from_secs(1)).await;
168            1
169        };
170        let to = timeout(Duration::from_secs(1000), fut);
171        assert_eq!(to.await.unwrap(), 1)
172    }
173}