ic_web3/transports/
batch.rs

1//! Batching Transport
2
3use crate::{
4    error::{self, Error},
5    rpc, BatchTransport, RequestId, Transport,
6};
7use futures::{
8    channel::oneshot,
9    task::{Context, Poll},
10    Future, FutureExt,
11};
12use parking_lot::Mutex;
13use std::{collections::BTreeMap, pin::Pin, sync::Arc};
14
15type Pending = oneshot::Sender<error::Result<rpc::Value>>;
16type PendingRequests = Arc<Mutex<BTreeMap<RequestId, Pending>>>;
17
18/// Transport allowing to batch queries together.
19#[derive(Debug, Clone)]
20pub struct Batch<T> {
21    transport: T,
22    pending: PendingRequests,
23    batch: Arc<Mutex<Vec<(RequestId, rpc::Call)>>>,
24}
25
26impl<T> Batch<T>
27where
28    T: BatchTransport,
29{
30    /// Creates new Batch transport given existing transport supporing batch requests.
31    pub fn new(transport: T) -> Self {
32        Batch {
33            transport,
34            pending: Default::default(),
35            batch: Default::default(),
36        }
37    }
38
39    /// Sends all requests as a batch.
40    pub fn submit_batch(&self) -> impl Future<Output = error::Result<Vec<error::Result<rpc::Value>>>> {
41        let batch = std::mem::take(&mut *self.batch.lock());
42        let ids = batch.iter().map(|&(id, _)| id).collect::<Vec<_>>();
43
44        let batch = self.transport.send_batch(batch);
45        let pending = self.pending.clone();
46
47        async move {
48            let res = batch.await;
49            let mut pending = pending.lock();
50            for (idx, request_id) in ids.into_iter().enumerate() {
51                if let Some(rx) = pending.remove(&request_id) {
52                    // Ignore sending error
53                    let _ = match res {
54                        Ok(ref results) if results.len() > idx => rx.send(results[idx].clone()),
55                        Err(ref err) => rx.send(Err(err.clone())),
56                        _ => rx.send(Err(Error::Internal)),
57                    };
58                }
59            }
60            res
61        }
62    }
63}
64
65impl<T> Transport for Batch<T>
66where
67    T: BatchTransport,
68{
69    type Out = SingleResult;
70
71    fn prepare(&self, method: &str, params: Vec<rpc::Value>) -> (RequestId, rpc::Call) {
72        self.transport.prepare(method, params)
73    }
74
75    fn send(&self, id: RequestId, request: rpc::Call) -> Self::Out {
76        let (tx, rx) = oneshot::channel();
77        self.pending.lock().insert(id, tx);
78        self.batch.lock().push((id, request));
79
80        SingleResult(rx)
81    }
82}
83
84/// Result of calling a single method that will be part of the batch.
85/// Converts `oneshot::Receiver` error into `Error::Internal`
86pub struct SingleResult(oneshot::Receiver<error::Result<rpc::Value>>);
87
88impl Future for SingleResult {
89    type Output = error::Result<rpc::Value>;
90
91    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
92        Poll::Ready(ready!(self.0.poll_unpin(ctx)).map_err(|_| Error::Internal)?)
93    }
94}