alloy_transport/layers/
fallback.rs

1use crate::time::Instant;
2use alloy_json_rpc::{RequestPacket, ResponsePacket};
3use core::time::Duration;
4use derive_more::{Deref, DerefMut};
5use futures::{stream::FuturesUnordered, StreamExt};
6use parking_lot::RwLock;
7use std::{
8    collections::VecDeque,
9    fmt::Debug,
10    num::NonZeroUsize,
11    sync::Arc,
12    task::{Context, Poll},
13};
14use tower::{Layer, Service};
15use tracing::trace;
16
17use crate::{TransportError, TransportErrorKind, TransportFut};
18
19// Constants for the transport ranking algorithm
20const STABILITY_WEIGHT: f64 = 0.7;
21const LATENCY_WEIGHT: f64 = 0.3;
22const DEFAULT_SAMPLE_COUNT: usize = 10;
23const DEFAULT_ACTIVE_TRANSPORT_COUNT: usize = 3;
24
25/// The [`FallbackService`] consumes multiple transports and is able to
26/// query them in parallel, returning the first successful response.
27///
28/// The service ranks transports based on latency and stability metrics,
29/// and will attempt to always use the best available transports.
30#[derive(Debug, Clone)]
31pub struct FallbackService<S> {
32    /// The list of transports to use
33    transports: Arc<Vec<ScoredTransport<S>>>,
34    /// The maximum number of transports to use in parallel
35    active_transport_count: usize,
36}
37
38impl<S: Clone> FallbackService<S> {
39    /// Create a new fallback service from a list of transports.
40    ///
41    /// The `active_transport_count` parameter controls how many transports are used for requests
42    /// at any one time.
43    pub fn new(transports: Vec<S>, active_transport_count: usize) -> Self {
44        let scored_transports = transports
45            .into_iter()
46            .enumerate()
47            .map(|(id, transport)| ScoredTransport::new(id, transport))
48            .collect::<Vec<_>>();
49
50        Self { transports: Arc::new(scored_transports), active_transport_count }
51    }
52
53    /// Log the current ranking of transports
54    fn log_transport_rankings(&self)
55    where
56        S: Debug,
57    {
58        let mut transports = (*self.transports).clone();
59        transports.sort_by(|a, b| b.cmp(a));
60
61        trace!("Current transport rankings:");
62        for (idx, transport) in transports.iter().enumerate() {
63            trace!("  #{}: Transport[{}] - {}", idx + 1, transport.id, transport.metrics_summary());
64        }
65    }
66}
67
68impl<S> FallbackService<S>
69where
70    S: Service<RequestPacket, Future = TransportFut<'static>, Error = TransportError>
71        + Send
72        + Clone
73        + Debug
74        + 'static,
75{
76    /// Make a request to the fallback service middleware.
77    ///
78    /// Here is a high-level overview of how requests are handled:
79    ///
80    /// - At the start of each request, we sort transports by score
81    /// - We take the top `self.active_transport_count` and call them in parallel
82    /// - If any of them succeeds, we update the transport scores and return the response
83    /// - If all transports fail, we update the scores and return the last error that occurred
84    ///
85    /// This strategy allows us to always make requests to the best available transports
86    /// while keeping them available.
87    async fn make_request(&self, req: RequestPacket) -> Result<ResponsePacket, TransportError> {
88        // Get the top transports to use for this request
89        let top_transports = {
90            // Clone the vec, sort it, and take the top `self.active_transport_count`
91            let mut transports_clone = (*self.transports).clone();
92            transports_clone.sort_by(|a, b| b.cmp(a));
93            transports_clone.into_iter().take(self.active_transport_count).collect::<Vec<_>>()
94        };
95
96        // Create a collection of future requests
97        let mut futures = FuturesUnordered::new();
98
99        // Launch requests to all active transports in parallel
100        for transport in top_transports {
101            let req_clone = req.clone();
102            let mut transport_clone = transport.clone();
103
104            let future = async move {
105                let start = Instant::now();
106                let result = transport_clone.call(req_clone).await;
107                trace!(
108                    "Transport[{}] completed: latency={:?}, status={}",
109                    transport_clone.id,
110                    start.elapsed(),
111                    if result.is_ok() { "success" } else { "fail" }
112                );
113
114                (result, transport_clone, start.elapsed())
115            };
116
117            futures.push(future);
118        }
119
120        // Wait for the first successful response or until all fail
121        let mut last_error = None;
122
123        while let Some((result, transport, duration)) = futures.next().await {
124            match result {
125                Ok(response) => {
126                    // Record success
127                    transport.track_success(duration);
128
129                    self.log_transport_rankings();
130
131                    return Ok(response);
132                }
133                Err(error) => {
134                    // Record failure
135                    transport.track_failure();
136
137                    last_error = Some(error);
138                }
139            }
140        }
141
142        Err(last_error.unwrap_or_else(|| {
143            TransportErrorKind::custom_str("All transport futures failed to complete")
144        }))
145    }
146}
147
148impl<S> Service<RequestPacket> for FallbackService<S>
149where
150    S: Service<RequestPacket, Future = TransportFut<'static>, Error = TransportError>
151        + Send
152        + Sync
153        + Clone
154        + Debug
155        + 'static,
156{
157    type Response = ResponsePacket;
158    type Error = TransportError;
159    type Future = TransportFut<'static>;
160
161    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
162        // Service is always ready
163        Poll::Ready(Ok(()))
164    }
165
166    fn call(&mut self, req: RequestPacket) -> Self::Future {
167        let this = self.clone();
168        Box::pin(async move { this.make_request(req).await })
169    }
170}
171
172/// Fallback layer for transparent transport failover. This layer will
173/// consume a list of transports to provide better availability and
174/// reliability.
175///
176/// The [`FallbackService`] will attempt to make requests to multiple
177/// transports in parallel, and return the first successful response.
178///
179/// If all transports fail, the fallback service will return an error.
180///
181/// # Automatic Transport Ranking
182///
183/// Each transport is automatically ranked based on latency & stability
184/// using a weighted algorithm. By default:
185///
186/// - Stability (success rate) is weighted at 70%
187/// - Latency (response time) is weighted at 30%
188/// - The `active_transport_count` parameter controls how many transports are queried at any one
189///   time.
190#[derive(Debug, Clone)]
191pub struct FallbackLayer {
192    /// The maximum number of transports to use in parallel
193    active_transport_count: usize,
194}
195
196impl FallbackLayer {
197    /// Set the number of active transports to use (must be greater than 0)
198    pub const fn with_active_transport_count(mut self, count: NonZeroUsize) -> Self {
199        self.active_transport_count = count.get();
200        self
201    }
202}
203
204impl<S> Layer<Vec<S>> for FallbackLayer
205where
206    S: Service<RequestPacket, Future = TransportFut<'static>, Error = TransportError>
207        + Send
208        + Clone
209        + Debug
210        + 'static,
211{
212    type Service = FallbackService<S>;
213
214    fn layer(&self, inner: Vec<S>) -> Self::Service {
215        FallbackService::new(inner, self.active_transport_count)
216    }
217}
218
219impl Default for FallbackLayer {
220    fn default() -> Self {
221        Self { active_transport_count: DEFAULT_ACTIVE_TRANSPORT_COUNT }
222    }
223}
224
225/// A scored transport that can be ordered in a heap.
226///
227/// The transport is scored every time it is used according to
228/// a simple weighted algorithm that favors latency and stability.
229///
230/// The score is calculated as follows (by default):
231///
232/// - Stability (success rate) is weighted at 70%
233/// - Latency (response time) is weighted at 30%
234///
235/// The score is then used to determine which transport to use next in
236/// the [`FallbackService`].
237#[derive(Debug, Clone, Deref, DerefMut)]
238struct ScoredTransport<S> {
239    /// The transport itself
240    #[deref]
241    #[deref_mut]
242    transport: S,
243    /// Unique identifier for the transport
244    id: usize,
245    /// Metrics for the transport
246    metrics: Arc<RwLock<TransportMetrics>>,
247}
248
249impl<S> ScoredTransport<S> {
250    /// Create a new scored transport
251    fn new(id: usize, transport: S) -> Self {
252        Self { id, transport, metrics: Arc::new(Default::default()) }
253    }
254
255    /// Returns the current score of the transport based on the weighted algorithm.
256    fn score(&self) -> f64 {
257        let metrics = self.metrics.read();
258        metrics.calculate_score()
259    }
260
261    /// Get metrics summary for debugging
262    fn metrics_summary(&self) -> String {
263        let metrics = self.metrics.read();
264        metrics.get_summary()
265    }
266
267    /// Track a successful request and its latency.
268    fn track_success(&self, duration: Duration) {
269        let mut metrics = self.metrics.write();
270        metrics.track_success(duration);
271    }
272
273    /// Track a failed request.
274    fn track_failure(&self) {
275        let mut metrics = self.metrics.write();
276        metrics.track_failure();
277    }
278}
279
280impl<S> PartialEq for ScoredTransport<S> {
281    fn eq(&self, other: &Self) -> bool {
282        self.score().eq(&other.score())
283    }
284}
285
286impl<S> Eq for ScoredTransport<S> {}
287
288#[expect(clippy::non_canonical_partial_ord_impl)]
289impl<S> PartialOrd for ScoredTransport<S> {
290    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
291        self.score().partial_cmp(&other.score())
292    }
293}
294
295impl<S> Ord for ScoredTransport<S> {
296    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
297        self.partial_cmp(other).unwrap_or(std::cmp::Ordering::Equal)
298    }
299}
300
301/// Represents performance metrics for a transport.
302#[derive(Debug)]
303struct TransportMetrics {
304    // Latency history - tracks last N responses
305    latencies: VecDeque<Duration>,
306    // Success history - tracks last N successes (true) or failures (false)
307    successes: VecDeque<bool>,
308    // Last time this transport was checked/used
309    last_update: Instant,
310    // Total number of requests made to this transport
311    total_requests: u64,
312    // Total number of successful requests
313    successful_requests: u64,
314}
315
316impl TransportMetrics {
317    /// Track a successful request and its latency.
318    fn track_success(&mut self, duration: Duration) {
319        self.total_requests += 1;
320        self.successful_requests += 1;
321        self.last_update = Instant::now();
322
323        // Add to sample windows
324        self.latencies.push_back(duration);
325        self.successes.push_back(true);
326
327        // Limit to sample count
328        while self.latencies.len() > DEFAULT_SAMPLE_COUNT {
329            self.latencies.pop_front();
330        }
331        while self.successes.len() > DEFAULT_SAMPLE_COUNT {
332            self.successes.pop_front();
333        }
334    }
335
336    /// Track a failed request.
337    fn track_failure(&mut self) {
338        self.total_requests += 1;
339        self.last_update = Instant::now();
340
341        // Add to sample windows (no latency for failures)
342        self.successes.push_back(false);
343
344        // Limit to sample count
345        while self.successes.len() > DEFAULT_SAMPLE_COUNT {
346            self.successes.pop_front();
347        }
348    }
349
350    /// Calculate weighted score based on stability and latency
351    fn calculate_score(&self) -> f64 {
352        // If no data yet, return initial neutral score
353        if self.successes.is_empty() {
354            return 0.0;
355        }
356
357        // Calculate stability score (percentage of successful requests)
358        let success_count = self.successes.iter().filter(|&&s| s).count();
359        let stability_score = success_count as f64 / self.successes.len() as f64;
360
361        // Calculate latency score (lower is better)
362        let latency_score = if !self.latencies.is_empty() {
363            let avg_latency = self.latencies.iter().map(|d| d.as_secs_f64()).sum::<f64>()
364                / self.latencies.len() as f64;
365
366            // Normalize latency score (1.0 for 0ms, approaches 0.0 as latency increases)
367            1.0 / (1.0 + avg_latency)
368        } else {
369            0.0
370        };
371
372        // Apply weights to calculate final score
373        (stability_score * STABILITY_WEIGHT) + (latency_score * LATENCY_WEIGHT)
374    }
375
376    /// Get a summary of metrics for debugging
377    fn get_summary(&self) -> String {
378        let success_rate = if !self.successes.is_empty() {
379            let success_count = self.successes.iter().filter(|&&s| s).count();
380            success_count as f64 / self.successes.len() as f64
381        } else {
382            0.0
383        };
384
385        let avg_latency = if !self.latencies.is_empty() {
386            self.latencies.iter().map(|d| d.as_secs_f64()).sum::<f64>()
387                / self.latencies.len() as f64
388        } else {
389            0.0
390        };
391
392        format!(
393            "success_rate: {:.2}%, avg_latency: {:.2}ms, samples: {}, score: {:.4}",
394            success_rate * 100.0,
395            avg_latency * 1000.0,
396            self.successes.len(),
397            self.calculate_score()
398        )
399    }
400}
401
402impl Default for TransportMetrics {
403    fn default() -> Self {
404        Self {
405            latencies: VecDeque::new(),
406            successes: VecDeque::new(),
407            last_update: Instant::now(),
408            total_requests: 0,
409            successful_requests: 0,
410        }
411    }
412}