tower_fault/latency/mod.rs
1//! # Latency injection for `tower`
2//!
3//! Layer that injects latency randomly into a service.
4//!
5//! ## Usage
6//!
7//! ```rust
8//! use tower_fault::latency::LatencyLayer;
9//! use tower::{service_fn, ServiceBuilder};
10//! # async fn my_service() -> Result<(), ()> {
11//! # Ok(())
12//! # }
13//!
14//! // Initialize a LatencyLayer with a 10% probability of injecting
15//! // 200 to 500 milliseconds of latency.
16//! let latency_layer = LatencyLayer::new(0.1, 200..500);
17//!
18//! let service = ServiceBuilder::new()
19//! .layer(latency_layer)
20//! .service(service_fn(my_service));
21//! ```
22//!
23//! ### Decider
24//!
25//! The __decider__ is used to determine if a latency should be injected
26//! or not. This can be a boolean, float, Bernoulli distribution, a
27//! closure, or a custom implementation of the [`Decider`] trait.
28//!
29//! For more information, see the [`decider`](crate::decider) module.
30//!
31//! ```rust
32//! use tower_fault::latency::LatencyLayer;
33//! # struct MyRequest { value: u64 };
34//!
35//! // Never inject latency.
36//! LatencyLayer::new(false, 200..500);
37//! // Always inject 200-500 ms of latency.
38//! LatencyLayer::new(true, 200..500);
39//!
40//! // Inject latency 30% of the time.
41//! LatencyLayer::new(0.3, 200..500);
42//!
43//! // Inject latency based on the request content.
44//! LatencyLayer::new(|req: &MyRequest| req.value % 2 == 0, 200..500);
45//! ```
46//!
47//! ### Distribution
48//!
49//! The latency __distribution__ is used to determine the duration of the
50//! latency injected in the service. The distribution can be a `Range`,
51//! `RangeInclusive`, static value, a closure, or a custom implementation
52//! of the [`Distribution`] trait.
53//!
54//! ```rust
55//! use tower_fault::latency::LatencyLayer;
56//! # struct MyRequest { value: u64 };
57//!
58//! // Latency between 200 and 500 milliseconds.
59//! LatencyLayer::new(0.3, 200..500);
60//! LatencyLayer::new(0.3, 200..=500);
61//!
62//! // Latency between 200 and 500 milliseconds, using floats for
63//! // extra precision.
64//! LatencyLayer::new(0.3, 200.0..500.0);
65//!
66//! // Fixed latency of 300 milliseconds.
67//! LatencyLayer::new(0.3, 300);
68//!
69//! // Closure that returns a latency based on the request content.
70//! LatencyLayer::new(0.3, |req: &MyRequest| req.value);
71//! ```
72//!
73
74use crate::decider::Decider;
75use std::{
76 future::Future,
77 marker::PhantomData,
78 pin::Pin,
79 task::{Context, Poll},
80};
81use tokio::time;
82use tower::{Layer, Service};
83
84mod distribution;
85pub use distribution::Distribution;
86
87/// Layer that randomly adds latency to the service.
88///
89/// __Note__: This does not add latency to the underlying service, but rather ensure
90/// that the service will have a minimal latency (set by the distribution) before
91/// returning a response.
92#[derive(Clone, Debug)]
93pub struct LatencyLayer<'a, De, Di> {
94 decider: De,
95 distribution: Di,
96 _phantom: PhantomData<&'a ()>,
97}
98
99impl<'a> LatencyLayer<'a, (), ()> {
100 /// Create a new `LatencyLayer` builder.
101 pub fn builder() -> Self {
102 Self {
103 decider: (),
104 distribution: (),
105 _phantom: PhantomData,
106 }
107 }
108}
109
110impl<'a, De, Di> LatencyLayer<'a, De, Di> {
111 /// Create a new `LatencyLayer` builder with the given probability
112 /// and latency distribution.
113 pub fn new(decider: De, distribution: Di) -> Self {
114 Self {
115 decider,
116 distribution,
117 _phantom: PhantomData,
118 }
119 }
120
121 /// Set the given decider to be used to determine if a latency
122 /// should be injected.
123 pub fn with_decider<NDe>(self, decider: NDe) -> LatencyLayer<'a, NDe, Di> {
124 LatencyLayer {
125 decider,
126 distribution: self.distribution,
127 _phantom: PhantomData,
128 }
129 }
130
131 /// Set the given latency distribution to set the latency.
132 pub fn with_distribution<NDi>(self, distribution: NDi) -> LatencyLayer<'a, De, NDi> {
133 LatencyLayer {
134 decider: self.decider,
135 distribution,
136 _phantom: PhantomData,
137 }
138 }
139}
140
141impl<'a, De, Di, S> Layer<S> for LatencyLayer<'a, De, Di>
142where
143 De: Clone,
144 Di: Clone,
145{
146 type Service = LatencyService<'a, De, Di, S>;
147
148 fn layer(&self, inner: S) -> Self::Service {
149 LatencyService {
150 inner,
151 decider: self.decider.clone(),
152 distribution: self.distribution.clone(),
153 _phantom: PhantomData,
154 }
155 }
156}
157
158/// Service that randomly injects latency into a service.
159#[derive(Clone, Debug)]
160pub struct LatencyService<'a, De, Di, S> {
161 inner: S,
162 decider: De,
163 distribution: Di,
164 _phantom: PhantomData<&'a ()>,
165}
166
167impl<'a, De, Di, S, R> Service<R> for LatencyService<'a, De, Di, S>
168where
169 De: Decider<R> + Clone,
170 Di: Distribution<R> + Clone,
171 S: Service<R> + Send,
172 S::Future: Send + 'a,
173{
174 type Response = S::Response;
175 type Error = S::Error;
176 type Future = LatencyFuture<'a, R, S>;
177
178 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
179 self.inner.poll_ready(cx)
180 }
181
182 fn call(&mut self, request: R) -> Self::Future {
183 let latency = if self.decider.decide(&request) {
184 Some(self.distribution.sample(&request))
185 } else {
186 None
187 };
188
189 let fut = self.inner.call(request);
190 Box::pin(async move {
191 if let Some(latency) = latency {
192 time::sleep(latency).await;
193 }
194 fut.await
195 })
196 }
197}
198
199type LatencyFuture<'a, R, S> = Pin<
200 Box<
201 dyn Future<Output = Result<<S as Service<R>>::Response, <S as Service<R>>::Error>>
202 + Send
203 + 'a,
204 >,
205>;