alloy_transport/layers/
fallback.rs1use crate::time::Instant;
2use alloy_json_rpc::{RequestPacket, ResponsePacket};
3use core::time::Duration;
4use derive_more::{Deref, DerefMut};
5use futures::{stream::FuturesUnordered, StreamExt};
6use parking_lot::RwLock;
7use std::{
8 collections::VecDeque,
9 fmt::Debug,
10 num::NonZeroUsize,
11 sync::Arc,
12 task::{Context, Poll},
13};
14use tower::{Layer, Service};
15use tracing::trace;
16
17use crate::{TransportError, TransportErrorKind, TransportFut};
18
19const STABILITY_WEIGHT: f64 = 0.7;
21const LATENCY_WEIGHT: f64 = 0.3;
22const DEFAULT_SAMPLE_COUNT: usize = 10;
23const DEFAULT_ACTIVE_TRANSPORT_COUNT: usize = 3;
24
25#[derive(Debug, Clone)]
31pub struct FallbackService<S> {
32 transports: Arc<Vec<ScoredTransport<S>>>,
34 active_transport_count: usize,
36}
37
38impl<S: Clone> FallbackService<S> {
39 pub fn new(transports: Vec<S>, active_transport_count: usize) -> Self {
44 let scored_transports = transports
45 .into_iter()
46 .enumerate()
47 .map(|(id, transport)| ScoredTransport::new(id, transport))
48 .collect::<Vec<_>>();
49
50 Self { transports: Arc::new(scored_transports), active_transport_count }
51 }
52
53 fn log_transport_rankings(&self)
55 where
56 S: Debug,
57 {
58 let mut transports = (*self.transports).clone();
59 transports.sort_by(|a, b| b.cmp(a));
60
61 trace!("Current transport rankings:");
62 for (idx, transport) in transports.iter().enumerate() {
63 trace!(" #{}: Transport[{}] - {}", idx + 1, transport.id, transport.metrics_summary());
64 }
65 }
66}
67
68impl<S> FallbackService<S>
69where
70 S: Service<RequestPacket, Future = TransportFut<'static>, Error = TransportError>
71 + Send
72 + Clone
73 + Debug
74 + 'static,
75{
76 async fn make_request(&self, req: RequestPacket) -> Result<ResponsePacket, TransportError> {
88 let top_transports = {
90 let mut transports_clone = (*self.transports).clone();
92 transports_clone.sort_by(|a, b| b.cmp(a));
93 transports_clone.into_iter().take(self.active_transport_count).collect::<Vec<_>>()
94 };
95
96 let mut futures = FuturesUnordered::new();
98
99 for transport in top_transports {
101 let req_clone = req.clone();
102 let mut transport_clone = transport.clone();
103
104 let future = async move {
105 let start = Instant::now();
106 let result = transport_clone.call(req_clone).await;
107 trace!(
108 "Transport[{}] completed: latency={:?}, status={}",
109 transport_clone.id,
110 start.elapsed(),
111 if result.is_ok() { "success" } else { "fail" }
112 );
113
114 (result, transport_clone, start.elapsed())
115 };
116
117 futures.push(future);
118 }
119
120 let mut last_error = None;
122
123 while let Some((result, transport, duration)) = futures.next().await {
124 match result {
125 Ok(response) => {
126 transport.track_success(duration);
128
129 self.log_transport_rankings();
130
131 return Ok(response);
132 }
133 Err(error) => {
134 transport.track_failure();
136
137 last_error = Some(error);
138 }
139 }
140 }
141
142 Err(last_error.unwrap_or_else(|| {
143 TransportErrorKind::custom_str("All transport futures failed to complete")
144 }))
145 }
146}
147
148impl<S> Service<RequestPacket> for FallbackService<S>
149where
150 S: Service<RequestPacket, Future = TransportFut<'static>, Error = TransportError>
151 + Send
152 + Sync
153 + Clone
154 + Debug
155 + 'static,
156{
157 type Response = ResponsePacket;
158 type Error = TransportError;
159 type Future = TransportFut<'static>;
160
161 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
162 Poll::Ready(Ok(()))
164 }
165
166 fn call(&mut self, req: RequestPacket) -> Self::Future {
167 let this = self.clone();
168 Box::pin(async move { this.make_request(req).await })
169 }
170}
171
172#[derive(Debug, Clone)]
191pub struct FallbackLayer {
192 active_transport_count: usize,
194}
195
196impl FallbackLayer {
197 pub const fn with_active_transport_count(mut self, count: NonZeroUsize) -> Self {
199 self.active_transport_count = count.get();
200 self
201 }
202}
203
204impl<S> Layer<Vec<S>> for FallbackLayer
205where
206 S: Service<RequestPacket, Future = TransportFut<'static>, Error = TransportError>
207 + Send
208 + Clone
209 + Debug
210 + 'static,
211{
212 type Service = FallbackService<S>;
213
214 fn layer(&self, inner: Vec<S>) -> Self::Service {
215 FallbackService::new(inner, self.active_transport_count)
216 }
217}
218
219impl Default for FallbackLayer {
220 fn default() -> Self {
221 Self { active_transport_count: DEFAULT_ACTIVE_TRANSPORT_COUNT }
222 }
223}
224
225#[derive(Debug, Clone, Deref, DerefMut)]
238struct ScoredTransport<S> {
239 #[deref]
241 #[deref_mut]
242 transport: S,
243 id: usize,
245 metrics: Arc<RwLock<TransportMetrics>>,
247}
248
249impl<S> ScoredTransport<S> {
250 fn new(id: usize, transport: S) -> Self {
252 Self { id, transport, metrics: Arc::new(Default::default()) }
253 }
254
255 fn score(&self) -> f64 {
257 let metrics = self.metrics.read();
258 metrics.calculate_score()
259 }
260
261 fn metrics_summary(&self) -> String {
263 let metrics = self.metrics.read();
264 metrics.get_summary()
265 }
266
267 fn track_success(&self, duration: Duration) {
269 let mut metrics = self.metrics.write();
270 metrics.track_success(duration);
271 }
272
273 fn track_failure(&self) {
275 let mut metrics = self.metrics.write();
276 metrics.track_failure();
277 }
278}
279
280impl<S> PartialEq for ScoredTransport<S> {
281 fn eq(&self, other: &Self) -> bool {
282 self.score().eq(&other.score())
283 }
284}
285
286impl<S> Eq for ScoredTransport<S> {}
287
288#[expect(clippy::non_canonical_partial_ord_impl)]
289impl<S> PartialOrd for ScoredTransport<S> {
290 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
291 self.score().partial_cmp(&other.score())
292 }
293}
294
295impl<S> Ord for ScoredTransport<S> {
296 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
297 self.partial_cmp(other).unwrap_or(std::cmp::Ordering::Equal)
298 }
299}
300
301#[derive(Debug)]
303struct TransportMetrics {
304 latencies: VecDeque<Duration>,
306 successes: VecDeque<bool>,
308 last_update: Instant,
310 total_requests: u64,
312 successful_requests: u64,
314}
315
316impl TransportMetrics {
317 fn track_success(&mut self, duration: Duration) {
319 self.total_requests += 1;
320 self.successful_requests += 1;
321 self.last_update = Instant::now();
322
323 self.latencies.push_back(duration);
325 self.successes.push_back(true);
326
327 while self.latencies.len() > DEFAULT_SAMPLE_COUNT {
329 self.latencies.pop_front();
330 }
331 while self.successes.len() > DEFAULT_SAMPLE_COUNT {
332 self.successes.pop_front();
333 }
334 }
335
336 fn track_failure(&mut self) {
338 self.total_requests += 1;
339 self.last_update = Instant::now();
340
341 self.successes.push_back(false);
343
344 while self.successes.len() > DEFAULT_SAMPLE_COUNT {
346 self.successes.pop_front();
347 }
348 }
349
350 fn calculate_score(&self) -> f64 {
352 if self.successes.is_empty() {
354 return 0.0;
355 }
356
357 let success_count = self.successes.iter().filter(|&&s| s).count();
359 let stability_score = success_count as f64 / self.successes.len() as f64;
360
361 let latency_score = if !self.latencies.is_empty() {
363 let avg_latency = self.latencies.iter().map(|d| d.as_secs_f64()).sum::<f64>()
364 / self.latencies.len() as f64;
365
366 1.0 / (1.0 + avg_latency)
368 } else {
369 0.0
370 };
371
372 (stability_score * STABILITY_WEIGHT) + (latency_score * LATENCY_WEIGHT)
374 }
375
376 fn get_summary(&self) -> String {
378 let success_rate = if !self.successes.is_empty() {
379 let success_count = self.successes.iter().filter(|&&s| s).count();
380 success_count as f64 / self.successes.len() as f64
381 } else {
382 0.0
383 };
384
385 let avg_latency = if !self.latencies.is_empty() {
386 self.latencies.iter().map(|d| d.as_secs_f64()).sum::<f64>()
387 / self.latencies.len() as f64
388 } else {
389 0.0
390 };
391
392 format!(
393 "success_rate: {:.2}%, avg_latency: {:.2}ms, samples: {}, score: {:.4}",
394 success_rate * 100.0,
395 avg_latency * 1000.0,
396 self.successes.len(),
397 self.calculate_score()
398 )
399 }
400}
401
402impl Default for TransportMetrics {
403 fn default() -> Self {
404 Self {
405 latencies: VecDeque::new(),
406 successes: VecDeque::new(),
407 last_update: Instant::now(),
408 total_requests: 0,
409 successful_requests: 0,
410 }
411 }
412}