ic_web3/transports/
batch.rs1use crate::{
4 error::{self, Error},
5 rpc, BatchTransport, RequestId, Transport,
6};
7use futures::{
8 channel::oneshot,
9 task::{Context, Poll},
10 Future, FutureExt,
11};
12use parking_lot::Mutex;
13use std::{collections::BTreeMap, pin::Pin, sync::Arc};
14
15type Pending = oneshot::Sender<error::Result<rpc::Value>>;
16type PendingRequests = Arc<Mutex<BTreeMap<RequestId, Pending>>>;
17
18#[derive(Debug, Clone)]
20pub struct Batch<T> {
21 transport: T,
22 pending: PendingRequests,
23 batch: Arc<Mutex<Vec<(RequestId, rpc::Call)>>>,
24}
25
26impl<T> Batch<T>
27where
28 T: BatchTransport,
29{
30 pub fn new(transport: T) -> Self {
32 Batch {
33 transport,
34 pending: Default::default(),
35 batch: Default::default(),
36 }
37 }
38
39 pub fn submit_batch(&self) -> impl Future<Output = error::Result<Vec<error::Result<rpc::Value>>>> {
41 let batch = std::mem::take(&mut *self.batch.lock());
42 let ids = batch.iter().map(|&(id, _)| id).collect::<Vec<_>>();
43
44 let batch = self.transport.send_batch(batch);
45 let pending = self.pending.clone();
46
47 async move {
48 let res = batch.await;
49 let mut pending = pending.lock();
50 for (idx, request_id) in ids.into_iter().enumerate() {
51 if let Some(rx) = pending.remove(&request_id) {
52 let _ = match res {
54 Ok(ref results) if results.len() > idx => rx.send(results[idx].clone()),
55 Err(ref err) => rx.send(Err(err.clone())),
56 _ => rx.send(Err(Error::Internal)),
57 };
58 }
59 }
60 res
61 }
62 }
63}
64
65impl<T> Transport for Batch<T>
66where
67 T: BatchTransport,
68{
69 type Out = SingleResult;
70
71 fn prepare(&self, method: &str, params: Vec<rpc::Value>) -> (RequestId, rpc::Call) {
72 self.transport.prepare(method, params)
73 }
74
75 fn send(&self, id: RequestId, request: rpc::Call) -> Self::Out {
76 let (tx, rx) = oneshot::channel();
77 self.pending.lock().insert(id, tx);
78 self.batch.lock().push((id, request));
79
80 SingleResult(rx)
81 }
82}
83
84pub struct SingleResult(oneshot::Receiver<error::Result<rpc::Value>>);
87
88impl Future for SingleResult {
89 type Output = error::Result<rpc::Value>;
90
91 fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
92 Poll::Ready(ready!(self.0.poll_unpin(ctx)).map_err(|_| Error::Internal)?)
93 }
94}