1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
//! Hedging resilience middleware for reducing tail latency via additional concurrent execution.
//!
//! This module provides hedging capabilities that launch additional concurrent requests
//! to reduce the impact of slow responses. The primary types are [`Hedging`] and [`HedgingLayer`]:
//!
//! - [`Hedging`] is the middleware that wraps an inner service and launches parallel hedging requests
//! - [`HedgingLayer`] is used to configure and construct the hedging middleware
//!
//! # Quick Start
//!
//! ```rust
//! # use std::time::Duration;
//! # use tick::Clock;
//! # use layered::{Execute, Service, Stack};
//! # use seatbelt::hedging::Hedging;
//! # use seatbelt::{RecoveryInfo, ResilienceContext};
//! # async fn example(clock: Clock) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
//! let context = ResilienceContext::new(&clock).name("my_service");
//!
//! let stack = (
//! Hedging::layer("hedging", &context)
//! .clone_input()
//! .recovery_with(|result, _| match result {
//! Ok(_) => RecoveryInfo::never(),
//! Err(_) => RecoveryInfo::retry(),
//! })
//! .hedging_delay(Duration::from_secs(1)),
//! Execute::new(my_operation),
//! );
//!
//! let service = stack.into_service();
//! let result = service.execute("input".to_string()).await;
//! # let _result = result;
//! # Ok(())
//! # }
//! # async fn my_operation(input: String) -> Result<String, String> { Ok(input) }
//! ```
//!
//! # How It Works
//!
//! Hedging sends the original request immediately. Based on the configured delay:
//!
//! - **Zero delay**: All hedged requests launch at once (via `hedging_delay(Duration::ZERO)`)
//! - **Fixed delay**: Each hedging attempt launches after a fixed delay if no acceptable result
//! has arrived (via [`hedging_delay`][HedgingLayer::hedging_delay])
//! - **Dynamic delay**: The delay is computed per hedging attempt via a user-provided callback
//! that also receives a reference to the input (via
//! [`hedging_delay_with`][HedgingLayer::hedging_delay_with])
//!
//! The first result classified as non-recoverable (via the recovery callback) is returned
//! immediately. Any remaining in-flight requests are cancelled.
//! [`RecoveryKind::Unknown`][crate::RecoveryKind::Unknown] is treated as non-recoverable
//! and accepted immediately.
//!
//! # When to Use Hedging
//!
//! Hedging increases overall load on the downstream service because it sends additional
//! concurrent requests. The downstream service **must be able to handle this extra load**
//! without degradation. If the downstream service is already under pressure, hedging can
//! amplify the problem rather than reduce latency.
//!
//! If the downstream service cannot tolerate additional load, prefer [`retry`][crate::retry]
//! instead - it only sends a new request after the previous one has completed or failed,
//! keeping the total request rate bounded.
//!
//! # Configuration
//!
//! The [`HedgingLayer`] uses a type state pattern to enforce that all required properties are
//! configured before the layer can be built. This compile-time safety ensures that you cannot
//! accidentally create a hedging layer without properly specifying input cloning and recovery logic:
//!
//! - [`clone_input_with`][HedgingLayer::clone_input_with]: Required function to clone inputs for
//! hedged attempts (Rust ownership requirement)
//! - [`recovery`][HedgingLayer::recovery]: Required function to classify whether an output is
//! acceptable
//!
//! Each hedging layer requires an identifier for telemetry purposes. This identifier should use
//! `snake_case` naming convention to maintain consistency across the codebase.
//!
//! # Thread Safety
//!
//! The [`Hedging`] type is thread-safe and implements both `Send` and `Sync` as enforced by
//! the `Service` trait it implements. This allows hedging middleware to be safely shared
//! across multiple threads and used in concurrent environments.
//!
//! # Defaults
//!
//! The hedging middleware uses the following default values when optional configuration is not
//! provided:
//!
//! | Parameter | Default Value | Description | Configured By |
//! |-----------|---------------|-------------|---------------|
//! | Max hedged attempts | `1` (2 total) | Additional hedging requests beyond the original | [`max_hedged_attempts`][HedgingLayer::max_hedged_attempts] |
//! | Hedging delay | `500ms` | Wait 500 milliseconds before each hedging attempt | [`hedging_delay`][HedgingLayer::hedging_delay], [`hedging_delay_with`][HedgingLayer::hedging_delay_with] |
//! | Handle unavailable | `false` | Unavailable responses are returned immediately | [`handle_unavailable`][HedgingLayer::handle_unavailable] |
//! | Enable condition | Always enabled | Hedging is applied to all requests | [`enable_if`][HedgingLayer::enable_if], [`enable_always`][HedgingLayer::enable_always], [`disable`][HedgingLayer::disable] |
//!
//! # Telemetry
//!
//! Hedging telemetry is **drop-safe**: it is emitted even for in-flight futures that are
//! cancelled when an acceptable result arrives. Each launched attempt carries a telemetry
//! guard that reports on drop.
//!
//! Telemetry is emitted when:
//! - A result is **recoverable**: reported with the actual [`RecoveryKind`][crate::RecoveryKind]
//! - A future is **abandoned** (dropped before completing): reported with recovery kind `"abandoned"`
//!
//! Telemetry is **not** emitted for non-recoverable (accepted) results.
//!
//! ## Metrics
//!
//! - **Metric**: `resilience.event` (counter)
//! - **When**: Emitted for recoverable and abandoned attempts
//! - **Attributes**:
//! - `resilience.pipeline.name`: Pipeline identifier from [`ResilienceContext::name`][crate::ResilienceContext::name]
//! - `resilience.strategy.name`: Hedging identifier from [`Hedging::layer`]
//! - `resilience.event.name`: Always `hedging`
//! - `resilience.attempt.index`: Attempt index (0 for original, 1 and more for hedging attempts)
//! - `resilience.attempt.is_last`: Whether this is the last attempt
//! - `resilience.attempt.recovery.kind`: The recovery classification (`retry`, `unavailable`, or `abandoned`)
//!
//! ## Logs
//!
//! Log events include all metric attributes plus `resilience.hedging.delay` (the delay
//! in seconds waited before launching the attempt).
//!
//! # Examples
//!
//! ## Basic Usage
//!
//! This example demonstrates the basic usage of configuring and using hedging middleware.
//!
//! ```rust
//! # use std::time::Duration;
//! # use tick::Clock;
//! # use layered::{Execute, Service, Stack};
//! # use seatbelt::hedging::Hedging;
//! # use seatbelt::{RecoveryInfo, ResilienceContext};
//! # async fn example(clock: Clock) -> Result<(), String> {
//! let context = ResilienceContext::new(&clock).name("example");
//!
//! let stack = (
//! Hedging::layer("my_hedging", &context)
//! // Required: how to clone inputs for hedged attempts
//! .clone_input()
//! // Required: determine if we should keep waiting for hedging attempts
//! .recovery_with(|output: &Result<String, String>, _args| match output {
//! Ok(_) => RecoveryInfo::never(),
//! Err(msg) if msg.contains("transient") => RecoveryInfo::retry(),
//! Err(_) => RecoveryInfo::never(),
//! }),
//! Execute::new(execute_unreliable_operation),
//! );
//!
//! let service = stack.into_service();
//! let result = service.execute("test input".to_string()).await;
//! # let _result = result;
//! # Ok(())
//! # }
//! # async fn execute_unreliable_operation(input: String) -> Result<String, String> { Ok(input) }
//! ```
//!
//! ## Advanced Usage
//!
//! This example demonstrates advanced usage of the hedging middleware, including dynamic
//! delays and on-execute callbacks.
//!
//! ```rust
//! # use std::time::Duration;
//! # use tick::Clock;
//! # use layered::{Execute, Stack, Service};
//! # use seatbelt::hedging::Hedging;
//! # use seatbelt::{RecoveryInfo, ResilienceContext};
//! # async fn example(clock: Clock) -> Result<(), String> {
//! let context = ResilienceContext::new(&clock);
//!
//! let stack = (
//! Hedging::layer("advanced_hedging", &context)
//! .clone_input()
//! .recovery_with(|output: &Result<String, String>, _args| match output {
//! Ok(_) => RecoveryInfo::never(),
//! Err(_) => RecoveryInfo::retry(),
//! })
//! // Optional configuration
//! .max_hedged_attempts(3)
//! .hedging_delay_with(|_input, args| {
//! Duration::from_millis(100 * u64::from(args.attempt().index()))
//! })
//! // Callback called just before each execute operation
//! .on_execute(|_input, args| {
//! println!("launching attempt: {}", args.attempt());
//! }),
//! Execute::new(execute_unreliable_operation),
//! );
//!
//! let service = stack.into_service();
//! let result = service.execute("test input".to_string()).await;
//! # let _result = result;
//! # Ok(())
//! # }
//! # async fn execute_unreliable_operation(input: String) -> Result<String, String> { Ok(input) }
//! ```
pub use crateAttempt;
pub use ;
pub use HedgingConfig;
pub use HedgingLayer;
pub use Hedging;
pub use HedgingFuture;