use futures::future::BoxFuture;
use futures::FutureExt as _;
use jsonrpc_core::Call;
use serde_json::Value;
use std::any::Any;
use std::fmt::Debug;
use std::future::Future;
use std::sync::Arc;
use web3::error::Error as Web3Error;
use web3::{BatchTransport, RequestId, Transport};
type BoxedFuture = BoxFuture<'static, Result<Value, Web3Error>>;
type BoxedBatch = BoxFuture<'static, Result<Vec<Result<Value, Web3Error>>, Web3Error>>;
trait TransportBoxed: Debug + Send + Sync + 'static {
fn prepare_boxed(&self, method: &str, params: Vec<Value>) -> (RequestId, Call);
fn send_boxed(&self, id: RequestId, request: Call) -> BoxedFuture;
fn execute_boxed(&self, method: &str, params: Vec<Value>) -> BoxedFuture;
fn send_batch_boxed(&self, requests: Vec<(RequestId, Call)>) -> BoxedBatch;
fn inner(&self) -> &(dyn Any + Send + Sync);
}
impl<F, B, T> TransportBoxed for T
where
F: Future<Output = Result<Value, Web3Error>> + Send + 'static,
B: Future<Output = Result<Vec<Result<Value, Web3Error>>, Web3Error>> + Send + 'static,
T: Transport<Out = F> + BatchTransport<Batch = B> + Debug + Send + Sync + 'static,
{
#[inline(always)]
fn prepare_boxed(&self, method: &str, params: Vec<Value>) -> (RequestId, Call) {
self.prepare(method, params)
}
#[inline(always)]
fn send_boxed(&self, id: RequestId, request: Call) -> BoxedFuture {
self.send(id, request).boxed()
}
#[inline(always)]
fn execute_boxed(&self, method: &str, params: Vec<Value>) -> BoxedFuture {
self.execute(method, params).boxed()
}
#[inline(always)]
fn send_batch_boxed(&self, requests: Vec<(RequestId, Call)>) -> BoxedBatch {
self.send_batch(requests.into_iter()).boxed()
}
#[inline(always)]
fn inner(&self) -> &(dyn Any + Send + Sync) {
self
}
}
#[derive(Debug)]
pub struct DynTransport {
inner: Arc<dyn TransportBoxed>,
}
impl DynTransport {
pub fn new<F, B, T>(inner: T) -> Self
where
F: Future<Output = Result<Value, Web3Error>> + Send + 'static,
B: Future<Output = Result<Vec<Result<Value, Web3Error>>, Web3Error>> + Send + 'static,
T: Transport<Out = F> + BatchTransport<Batch = B> + Send + Sync + 'static,
{
let inner_ref: &dyn Any = &inner;
let inner_arc = match inner_ref.downcast_ref::<DynTransport>() {
Some(dyn_transport) => dyn_transport.inner.clone(),
None => Arc::new(inner),
};
DynTransport { inner: inner_arc }
}
pub fn downcast<T: Any + Send + Sync + 'static>(&self) -> Option<&T> {
self.inner.inner().downcast_ref()
}
}
impl Clone for DynTransport {
fn clone(&self) -> Self {
DynTransport {
inner: self.inner.clone(),
}
}
}
impl Transport for DynTransport {
type Out = BoxedFuture;
#[inline(always)]
fn prepare(&self, method: &str, params: Vec<Value>) -> (RequestId, Call) {
self.inner.prepare_boxed(method, params)
}
#[inline(always)]
fn send(&self, id: RequestId, request: Call) -> Self::Out {
self.inner.send_boxed(id, request)
}
#[inline(always)]
fn execute(&self, method: &str, params: Vec<Value>) -> Self::Out {
self.inner.execute_boxed(method, params)
}
}
impl BatchTransport for DynTransport {
type Batch = BoxedBatch;
fn send_batch<T>(&self, requests: T) -> Self::Batch
where
T: IntoIterator<Item = (RequestId, Call)>,
{
self.inner.send_batch_boxed(requests.into_iter().collect())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test::prelude::*;
#[test]
fn dyn_transport() {
let mut transport = TestTransport::new();
let dyn_transport = DynTransport::new(transport.clone());
let (id, call) = dyn_transport.prepare("test", vec![json!(28)]);
transport.assert_request("test", &[json!(28)]);
transport.assert_no_more_requests();
transport.add_response(json!(true));
let response = dyn_transport.send(id, call).immediate().expect("success");
assert_eq!(response, json!(true));
dyn_transport
.execute("test", vec![json!(42)])
.immediate()
.expect_err("failed");
transport.assert_request("test", &[json!(42)]);
transport.assert_no_more_requests();
}
#[test]
#[allow(clippy::redundant_clone)]
fn dyn_transport_does_not_double_wrap() {
let transport = TestTransport::new();
let dyn_transport = DynTransport::new(transport);
assert_eq!(Arc::strong_count(&dyn_transport.inner), 1);
let dyn_dyn_transport = DynTransport::new(dyn_transport.clone());
assert_eq!(Arc::strong_count(&dyn_dyn_transport.inner), 2);
}
#[test]
fn dyn_transport_is_threadsafe() {
let transport = TestTransport::new();
let dyn_transport = DynTransport::new(transport);
std::thread::spawn(move || {
let _ = dyn_transport.prepare("test", vec![json!(28)]);
});
}
#[test]
fn dyn_transport_is_downcastable() {
let transport = TestTransport::new();
let dyn_transport = DynTransport::new(transport);
let concrete_transport: &TestTransport = dyn_transport.downcast().unwrap();
concrete_transport.prepare("test", vec![json!(28)]);
let dyn_dyn_transport = DynTransport::new(dyn_transport);
let concrete_transport: &TestTransport = dyn_dyn_transport.downcast().unwrap();
concrete_transport.prepare("test", vec![json!(28)]);
}
}