1use futures::future::BoxFuture;
10use futures::FutureExt as _;
11use jsonrpc_core::Call;
12use serde_json::Value;
13use std::any::Any;
14use std::fmt::Debug;
15use std::future::Future;
16use std::sync::Arc;
17use web3::error::Error as Web3Error;
18use web3::{BatchTransport, RequestId, Transport};
19
20type BoxedFuture = BoxFuture<'static, Result<Value, Web3Error>>;
23type BoxedBatch = BoxFuture<'static, Result<Vec<Result<Value, Web3Error>>, Web3Error>>;
24
25trait TransportBoxed: Debug + Send + Sync + 'static {
28 fn prepare_boxed(&self, method: &str, params: Vec<Value>) -> (RequestId, Call);
30
31 fn send_boxed(&self, id: RequestId, request: Call) -> BoxedFuture;
33
34 fn execute_boxed(&self, method: &str, params: Vec<Value>) -> BoxedFuture;
36
37 fn send_batch_boxed(&self, requests: Vec<(RequestId, Call)>) -> BoxedBatch;
39
40 fn inner(&self) -> &(dyn Any + Send + Sync);
42}
43
44impl<F, B, T> TransportBoxed for T
45where
46 F: Future<Output = Result<Value, Web3Error>> + Send + 'static,
47 B: Future<Output = Result<Vec<Result<Value, Web3Error>>, Web3Error>> + Send + 'static,
48 T: Transport<Out = F> + BatchTransport<Batch = B> + Debug + Send + Sync + 'static,
49{
50 #[inline(always)]
51 fn prepare_boxed(&self, method: &str, params: Vec<Value>) -> (RequestId, Call) {
52 self.prepare(method, params)
53 }
54
55 #[inline(always)]
56 fn send_boxed(&self, id: RequestId, request: Call) -> BoxedFuture {
57 self.send(id, request).boxed()
58 }
59
60 #[inline(always)]
61 fn execute_boxed(&self, method: &str, params: Vec<Value>) -> BoxedFuture {
62 self.execute(method, params).boxed()
63 }
64
65 #[inline(always)]
66 fn send_batch_boxed(&self, requests: Vec<(RequestId, Call)>) -> BoxedBatch {
67 self.send_batch(requests).boxed()
68 }
69
70 #[inline(always)]
71 fn inner(&self) -> &(dyn Any + Send + Sync) {
72 self
73 }
74}
75
76#[derive(Debug)]
79pub struct DynTransport {
80 inner: Arc<dyn TransportBoxed>,
81}
82
83impl DynTransport {
84 pub fn new<F, B, T>(inner: T) -> Self
86 where
87 F: Future<Output = Result<Value, Web3Error>> + Send + 'static,
88 B: Future<Output = Result<Vec<Result<Value, Web3Error>>, Web3Error>> + Send + 'static,
89 T: Transport<Out = F> + BatchTransport<Batch = B> + Send + Sync + 'static,
90 {
91 let inner_ref: &dyn Any = &inner;
92 let inner_arc = match inner_ref.downcast_ref::<DynTransport>() {
93 Some(dyn_transport) => dyn_transport.inner.clone(),
97 None => Arc::new(inner),
98 };
99
100 DynTransport { inner: inner_arc }
101 }
102
103 pub fn downcast<T: Any + Send + Sync + 'static>(&self) -> Option<&T> {
105 self.inner.inner().downcast_ref()
106 }
107}
108
109impl Clone for DynTransport {
110 fn clone(&self) -> Self {
111 DynTransport {
112 inner: self.inner.clone(),
113 }
114 }
115}
116
117impl Transport for DynTransport {
118 type Out = BoxedFuture;
119
120 #[inline(always)]
121 fn prepare(&self, method: &str, params: Vec<Value>) -> (RequestId, Call) {
122 self.inner.prepare_boxed(method, params)
123 }
124
125 #[inline(always)]
126 fn send(&self, id: RequestId, request: Call) -> Self::Out {
127 self.inner.send_boxed(id, request)
128 }
129
130 #[inline(always)]
131 fn execute(&self, method: &str, params: Vec<Value>) -> Self::Out {
132 self.inner.execute_boxed(method, params)
133 }
134}
135
136impl BatchTransport for DynTransport {
137 type Batch = BoxedBatch;
138
139 fn send_batch<T>(&self, requests: T) -> Self::Batch
140 where
141 T: IntoIterator<Item = (RequestId, Call)>,
142 {
143 self.inner.send_batch_boxed(requests.into_iter().collect())
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150 use crate::test::prelude::*;
151
152 #[test]
153 fn dyn_transport() {
154 let mut transport = TestTransport::new();
155 let dyn_transport = DynTransport::new(transport.clone());
156
157 let (id, call) = dyn_transport.prepare("test", vec![json!(28)]);
159 transport.assert_request("test", &[json!(28)]);
160 transport.assert_no_more_requests();
161
162 transport.add_response(json!(true));
164 let response = dyn_transport.send(id, call).immediate().expect("success");
165 assert_eq!(response, json!(true));
166
167 dyn_transport
170 .execute("test", vec![json!(42)])
171 .immediate()
172 .expect_err("failed");
173 transport.assert_request("test", &[json!(42)]);
174 transport.assert_no_more_requests();
175 }
176
177 #[test]
178 #[allow(clippy::redundant_clone)]
179 fn dyn_transport_does_not_double_wrap() {
180 let transport = TestTransport::new();
181 let dyn_transport = DynTransport::new(transport);
182 assert_eq!(Arc::strong_count(&dyn_transport.inner), 1);
183
184 let dyn_dyn_transport = DynTransport::new(dyn_transport.clone());
185 assert_eq!(Arc::strong_count(&dyn_dyn_transport.inner), 2);
190 }
191
192 #[test]
193 fn dyn_transport_is_threadsafe() {
194 let transport = TestTransport::new();
195 let dyn_transport = DynTransport::new(transport);
196 std::thread::spawn(move || {
197 let _ = dyn_transport.prepare("test", vec![json!(28)]);
200 });
201 }
202
203 #[test]
204 fn dyn_transport_is_downcastable() {
205 let transport = TestTransport::new();
206
207 let dyn_transport = DynTransport::new(transport);
208 let concrete_transport: &TestTransport = dyn_transport.downcast().unwrap();
209 concrete_transport.prepare("test", vec![json!(28)]);
210
211 let dyn_dyn_transport = DynTransport::new(dyn_transport);
213 let concrete_transport: &TestTransport = dyn_dyn_transport.downcast().unwrap();
214 concrete_transport.prepare("test", vec![json!(28)]);
215 }
216}