1use std::time::Duration;
7
8use crate::error::TransportError;
9use crate::method_safety;
10use crate::request::{JsonRpcRequest, JsonRpcResponse};
11use crate::transport::RpcTransport;
12
13#[derive(Debug, Clone)]
15pub struct HedgingConfig {
16 pub hedge_delay: Duration,
18}
19
20impl Default for HedgingConfig {
21 fn default() -> Self {
22 Self {
23 hedge_delay: Duration::from_millis(100),
24 }
25 }
26}
27
28pub async fn hedged_send(
33 primary: &dyn RpcTransport,
34 backup: &dyn RpcTransport,
35 req: JsonRpcRequest,
36 config: &HedgingConfig,
37) -> Result<JsonRpcResponse, TransportError> {
38 if !method_safety::is_safe_to_retry(&req.method) {
40 return primary.send(req).await;
41 }
42
43 let req_clone = req.clone();
44 let delay = config.hedge_delay;
45
46 tokio::select! {
48 result = primary.send(req) => {
49 result
50 }
51 result = async {
52 tokio::time::sleep(delay).await;
53 backup.send(req_clone).await
54 } => {
55 result
56 }
57 }
58}
59
60#[cfg(test)]
61mod tests {
62 use super::*;
63 use crate::request::RpcId;
64 use async_trait::async_trait;
65
66 struct DelayTransport {
67 delay: Duration,
68 label: String,
69 }
70
71 #[async_trait]
72 impl RpcTransport for DelayTransport {
73 async fn send(&self, _req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
74 tokio::time::sleep(self.delay).await;
75 Ok(JsonRpcResponse {
76 jsonrpc: "2.0".into(),
77 id: RpcId::Number(1),
78 result: Some(serde_json::json!(self.label)),
79 error: None,
80 })
81 }
82 fn url(&self) -> &str {
83 &self.label
84 }
85 }
86
87 #[tokio::test]
88 async fn primary_wins_when_fast() {
89 let primary = DelayTransport {
90 delay: Duration::from_millis(10),
91 label: "primary".into(),
92 };
93 let backup = DelayTransport {
94 delay: Duration::from_millis(10),
95 label: "backup".into(),
96 };
97
98 let config = HedgingConfig {
99 hedge_delay: Duration::from_millis(200), };
101
102 let req = JsonRpcRequest::auto("eth_blockNumber", vec![]);
103 let resp = hedged_send(&primary, &backup, req, &config).await.unwrap();
104 assert_eq!(resp.result.unwrap().as_str().unwrap(), "primary");
105 }
106
107 #[tokio::test]
108 async fn backup_wins_when_primary_slow() {
109 let primary = DelayTransport {
110 delay: Duration::from_millis(500), label: "primary".into(),
112 };
113 let backup = DelayTransport {
114 delay: Duration::from_millis(10), label: "backup".into(),
116 };
117
118 let config = HedgingConfig {
119 hedge_delay: Duration::from_millis(50),
120 };
121
122 let req = JsonRpcRequest::auto("eth_blockNumber", vec![]);
123 let resp = hedged_send(&primary, &backup, req, &config).await.unwrap();
124 assert_eq!(resp.result.unwrap().as_str().unwrap(), "backup");
126 }
127
128 #[tokio::test]
129 async fn no_hedging_for_writes() {
130 let primary = DelayTransport {
131 delay: Duration::from_millis(500),
132 label: "primary".into(),
133 };
134 let backup = DelayTransport {
135 delay: Duration::from_millis(10),
136 label: "backup".into(),
137 };
138
139 let config = HedgingConfig {
140 hedge_delay: Duration::from_millis(50),
141 };
142
143 let req = JsonRpcRequest::auto("eth_sendRawTransaction", vec![]);
145 let resp = hedged_send(&primary, &backup, req, &config).await.unwrap();
146 assert_eq!(resp.result.unwrap().as_str().unwrap(), "primary");
148 }
149}