Skip to main content

do_over/
hedge.rs

1//! Hedge policy for reducing tail latency.
2//!
3//! The hedge policy starts a backup request if the primary request is slow,
4//! returning whichever completes first. This reduces tail latency at the cost
5//! of potentially sending more requests.
6//!
7//! # How It Works
8//!
9//! 1. Start the primary request
10//! 2. After a configured delay, start a hedged (backup) request
11//! 3. Return whichever completes first
12//! 4. Cancel the slower request
13//!
14//! # Important
15//!
16//! Only use hedging with **idempotent** operations (safe to execute multiple times).
17//!
18//! # Examples
19//!
20//! ```rust
21//! use do_over::{policy::Policy, hedge::Hedge, error::DoOverError};
22//! use std::time::Duration;
23//!
24//! # async fn example() -> Result<(), DoOverError<std::io::Error>> {
25//! // Start backup request after 100ms if primary hasn't completed
26//! let hedge = Hedge::new(Duration::from_millis(100));
27//!
28//! let result = hedge.execute(|| async {
29//!     Ok::<_, DoOverError<std::io::Error>>("completed")
30//! }).await?;
31//! # Ok(())
32//! # }
33//! ```
34
35use std::time::Duration;
36use tokio::time::sleep;
37use crate::{policy::Policy, error::DoOverError};
38
39/// A policy that sends backup requests to reduce tail latency.
40///
41/// After a configured delay, if the primary request hasn't completed,
42/// a hedge (backup) request is started. The first response wins.
43///
44/// # Warning
45///
46/// Only use with idempotent operations. Using hedging with non-idempotent
47/// operations (like payment processing) can cause duplicate effects.
48///
49/// # Examples
50///
51/// ```rust
52/// use do_over::{policy::Policy, hedge::Hedge, error::DoOverError};
53/// use std::time::Duration;
54///
55/// # async fn example() {
56/// // Good use case: read operations
57/// let hedge = Hedge::new(Duration::from_millis(100));
58///
59/// // The hedge request starts if primary takes > 100ms
60/// let result: Result<String, DoOverError<String>> = hedge.execute(|| async {
61///     Ok("data".to_string())
62/// }).await;
63/// # }
64/// ```
65#[derive(Clone)]
66pub struct Hedge {
67    delay: Duration,
68}
69
70impl Hedge {
71    /// Create a new hedge policy.
72    ///
73    /// # Arguments
74    ///
75    /// * `delay` - How long to wait before starting the hedge request
76    ///
77    /// # Examples
78    ///
79    /// ```rust
80    /// use do_over::hedge::Hedge;
81    /// use std::time::Duration;
82    ///
83    /// // Start hedge after 100ms
84    /// let hedge = Hedge::new(Duration::from_millis(100));
85    /// ```
86    pub fn new(delay: Duration) -> Self {
87        Self { delay }
88    }
89}
90
91#[async_trait::async_trait]
92impl<E> Policy<DoOverError<E>> for Hedge
93where
94    E: Send + Sync,
95{
96    async fn execute<F, Fut, T>(&self, f: F) -> Result<T, DoOverError<E>>
97    where
98        F: Fn() -> Fut + Send + Sync,
99        Fut: std::future::Future<Output = Result<T, DoOverError<E>>> + Send,
100        T: Send,
101    {
102        // Use tokio::select! with futures instead of spawning tasks
103        tokio::select! {
104            r = f() => r,
105            r = async {
106                sleep(self.delay).await;
107                f().await
108            } => r,
109        }
110    }
111}