alloy_rpc_client/
call.rs

1use alloy_json_rpc::{
2    transform_response, try_deserialize_ok, Request, RequestMeta, RequestPacket, ResponsePacket,
3    RpcRecv, RpcResult, RpcSend,
4};
5use alloy_transport::{BoxTransport, IntoBoxTransport, RpcFut, TransportError, TransportResult};
6use futures::FutureExt;
7use serde_json::value::RawValue;
8use std::{
9    fmt,
10    future::Future,
11    marker::PhantomData,
12    pin::Pin,
13    task::{self, ready, Poll::Ready},
14};
15use tower::Service;
16
17/// The states of the [`RpcCall`] future.
18#[must_use = "futures do nothing unless you `.await` or poll them"]
19#[pin_project::pin_project(project = CallStateProj)]
20enum CallState<Params>
21where
22    Params: RpcSend,
23{
24    Prepared {
25        request: Option<Request<Params>>,
26        connection: BoxTransport,
27    },
28    AwaitingResponse {
29        #[pin]
30        fut: <BoxTransport as Service<RequestPacket>>::Future,
31    },
32    Complete,
33}
34
35impl<Params> Clone for CallState<Params>
36where
37    Params: RpcSend,
38{
39    fn clone(&self) -> Self {
40        match self {
41            Self::Prepared { request, connection } => {
42                Self::Prepared { request: request.clone(), connection: connection.clone() }
43            }
44            _ => panic!("cloned after dispatch"),
45        }
46    }
47}
48
49impl<Params> fmt::Debug for CallState<Params>
50where
51    Params: RpcSend,
52{
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        f.write_str(match self {
55            Self::Prepared { .. } => "Prepared",
56            Self::AwaitingResponse { .. } => "AwaitingResponse",
57            Self::Complete => "Complete",
58        })
59    }
60}
61
62impl<Params> Future for CallState<Params>
63where
64    Params: RpcSend,
65{
66    type Output = TransportResult<Box<RawValue>>;
67
68    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
69        loop {
70            match self.as_mut().project() {
71                CallStateProj::Prepared { connection, request } => {
72                    if let Err(e) =
73                        task::ready!(Service::<RequestPacket>::poll_ready(connection, cx))
74                    {
75                        self.set(Self::Complete);
76                        return Ready(RpcResult::Err(e));
77                    }
78
79                    let request = request.take().expect("no request");
80                    if tracing::enabled!(tracing::Level::TRACE) {
81                        trace!(?request, "sending request");
82                    } else {
83                        debug!(method=%request.meta.method, id=%request.meta.id, "sending request");
84                    }
85                    let request = request.serialize();
86                    let fut = match request {
87                        Ok(request) => {
88                            trace!(request=%request.serialized(), "serialized request");
89                            connection.call(request.into())
90                        }
91                        Err(err) => {
92                            trace!(?err, "failed to serialize request");
93                            self.set(Self::Complete);
94                            return Ready(RpcResult::Err(TransportError::ser_err(err)));
95                        }
96                    };
97                    self.set(Self::AwaitingResponse { fut });
98                }
99                CallStateProj::AwaitingResponse { fut } => {
100                    let res = match task::ready!(fut.poll(cx)) {
101                        Ok(ResponsePacket::Single(res)) => Ready(transform_response(res)),
102                        Err(e) => Ready(RpcResult::Err(e)),
103                        _ => panic!("received batch response from single request"),
104                    };
105                    self.set(Self::Complete);
106                    return res;
107                }
108                CallStateProj::Complete => {
109                    panic!("Polled after completion");
110                }
111            }
112        }
113    }
114}
115
116/// A prepared, but unsent, RPC call.
117///
118/// This is a future that will send the request when polled. It contains a
119/// [`Request`], a [`BoxTransport`], and knowledge of its expected response
120/// type. Upon awaiting, it will send the request and wait for the response. It
121/// will then deserialize the response into the expected type.
122///
123/// Errors are captured in the [`RpcResult`] type. Rpc Calls will result in
124/// either a successful response of the `Resp` type, an error response, or a
125/// transport error.
126///
127/// ### Note
128///
129/// Serializing the request is done lazily. The request is not serialized until
130/// the future is polled. This differs from the behavior of
131/// [`crate::BatchRequest`], which serializes greedily. This is because the
132/// batch request must immediately erase the `Param` type to allow batching of
133/// requests with different `Param` types, while the `RpcCall` may do so lazily.
134#[must_use = "futures do nothing unless you `.await` or poll them"]
135#[pin_project::pin_project]
136#[derive(Clone)]
137pub struct RpcCall<Params, Resp, Output = Resp, Map = fn(Resp) -> Output>
138where
139    Params: RpcSend,
140    Map: FnOnce(Resp) -> Output,
141{
142    #[pin]
143    state: CallState<Params>,
144    map: Option<Map>,
145    _pd: core::marker::PhantomData<fn() -> (Resp, Output)>,
146}
147
148impl<Params, Resp, Output, Map> core::fmt::Debug for RpcCall<Params, Resp, Output, Map>
149where
150    Params: RpcSend,
151    Map: FnOnce(Resp) -> Output,
152{
153    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
154        f.debug_struct("RpcCall").field("state", &self.state).finish()
155    }
156}
157
158impl<Params, Resp> RpcCall<Params, Resp>
159where
160    Params: RpcSend,
161{
162    #[doc(hidden)]
163    pub fn new(req: Request<Params>, connection: impl IntoBoxTransport) -> Self {
164        Self {
165            state: CallState::Prepared {
166                request: Some(req),
167                connection: connection.into_box_transport(),
168            },
169            map: Some(std::convert::identity),
170            _pd: PhantomData,
171        }
172    }
173}
174
175impl<Params, Resp, Output, Map> RpcCall<Params, Resp, Output, Map>
176where
177    Params: RpcSend,
178    Map: FnOnce(Resp) -> Output,
179{
180    /// Map the response to a different type. This is usable for converting
181    /// the response to a more usable type, e.g. changing `U64` to `u64`.
182    ///
183    /// ## Note
184    ///
185    /// Carefully review the rust documentation on [fn pointers] before passing
186    /// them to this function. Unless the pointer is specifically coerced to a
187    /// `fn(_) -> _`, the `NewMap` will be inferred as that function's unique
188    /// type. This can lead to confusing error messages.
189    ///
190    /// [fn pointers]: https://doc.rust-lang.org/std/primitive.fn.html#creating-function-pointers
191    pub fn map_resp<NewOutput, NewMap>(
192        self,
193        map: NewMap,
194    ) -> RpcCall<Params, Resp, NewOutput, NewMap>
195    where
196        NewMap: FnOnce(Resp) -> NewOutput,
197    {
198        RpcCall { state: self.state, map: Some(map), _pd: PhantomData }
199    }
200
201    /// Returns `true` if the request is a subscription.
202    ///
203    /// # Panics
204    ///
205    /// Panics if called after the request has been sent.
206    pub fn is_subscription(&self) -> bool {
207        self.request().meta.is_subscription()
208    }
209
210    /// Set the request to be a non-standard subscription (i.e. not
211    /// "eth_subscribe").
212    ///
213    /// # Panics
214    ///
215    /// Panics if called after the request has been sent.
216    pub fn set_is_subscription(&mut self) {
217        self.request_mut().meta.set_is_subscription();
218    }
219
220    /// Set the subscription status of the request.
221    pub fn set_subscription_status(&mut self, status: bool) {
222        self.request_mut().meta.set_subscription_status(status);
223    }
224
225    /// Get a mutable reference to the params of the request.
226    ///
227    /// This is useful for modifying the params after the request has been
228    /// prepared.
229    ///
230    /// # Panics
231    ///
232    /// Panics if called after the request has been sent.
233    pub fn params(&mut self) -> &mut Params {
234        &mut self.request_mut().params
235    }
236
237    /// Returns a reference to the request.
238    ///
239    /// # Panics
240    ///
241    /// Panics if called after the request has been sent.
242    pub fn request(&self) -> &Request<Params> {
243        let CallState::Prepared { request, .. } = &self.state else {
244            panic!("Cannot get request after request has been sent");
245        };
246        request.as_ref().expect("no request in prepared")
247    }
248
249    /// Returns the RPC method
250    pub fn method(&self) -> &str {
251        &self.request().meta.method
252    }
253
254    /// Returns a mutable reference to the request.
255    ///
256    /// # Panics
257    ///
258    /// Panics if called after the request has been sent.
259    pub fn request_mut(&mut self) -> &mut Request<Params> {
260        let CallState::Prepared { request, .. } = &mut self.state else {
261            panic!("Cannot get request after request has been sent");
262        };
263        request.as_mut().expect("no request in prepared")
264    }
265
266    /// Map the params of the request into a new type.
267    pub fn map_params<NewParams: RpcSend>(
268        self,
269        map: impl Fn(Params) -> NewParams,
270    ) -> RpcCall<NewParams, Resp, Output, Map> {
271        let CallState::Prepared { request, connection } = self.state else {
272            panic!("Cannot get request after request has been sent");
273        };
274        let request = request.expect("no request in prepared").map_params(map);
275        RpcCall {
276            state: CallState::Prepared { request: Some(request), connection },
277            map: self.map,
278            _pd: PhantomData,
279        }
280    }
281
282    /// Maps the metadata of the request using the provided function.
283    pub fn map_meta(self, f: impl FnOnce(RequestMeta) -> RequestMeta) -> Self {
284        let CallState::Prepared { request, connection } = self.state else {
285            panic!("Cannot get request after request has been sent");
286        };
287        let request = request.expect("no request in prepared").map_meta(f);
288        Self {
289            state: CallState::Prepared { request: Some(request), connection },
290            map: self.map,
291            _pd: PhantomData,
292        }
293    }
294}
295
296impl<Params, Resp, Output, Map> RpcCall<&Params, Resp, Output, Map>
297where
298    Params: RpcSend + ToOwned,
299    Params::Owned: RpcSend,
300    Map: FnOnce(Resp) -> Output,
301{
302    /// Convert this call into one with owned params, by cloning the params.
303    ///
304    /// # Panics
305    ///
306    /// Panics if called after the request has been polled.
307    pub fn into_owned_params(self) -> RpcCall<Params::Owned, Resp, Output, Map> {
308        let CallState::Prepared { request, connection } = self.state else {
309            panic!("Cannot get params after request has been sent");
310        };
311        let request = request.expect("no request in prepared").into_owned_params();
312
313        RpcCall {
314            state: CallState::Prepared { request: Some(request), connection },
315            map: self.map,
316            _pd: PhantomData,
317        }
318    }
319}
320
321impl<'a, Params, Resp, Output, Map> RpcCall<Params, Resp, Output, Map>
322where
323    Params: RpcSend + 'a,
324    Resp: RpcRecv,
325    Output: 'static,
326    Map: FnOnce(Resp) -> Output + Send + 'a,
327{
328    /// Convert this future into a boxed, pinned future, erasing its type.
329    pub fn boxed(self) -> RpcFut<'a, Output> {
330        Box::pin(self)
331    }
332}
333
334impl<Params, Resp, Output, Map> Future for RpcCall<Params, Resp, Output, Map>
335where
336    Params: RpcSend,
337    Resp: RpcRecv,
338    Output: 'static,
339    Map: FnOnce(Resp) -> Output,
340{
341    type Output = TransportResult<Output>;
342
343    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
344        let this = self.get_mut();
345        let resp = try_deserialize_ok(ready!(this.state.poll_unpin(cx)));
346        Ready(resp.map(this.map.take().expect("polled after completion")))
347    }
348}