1use alloy_json_rpc::{
2 transform_response, try_deserialize_ok, Request, RequestMeta, RequestPacket, ResponsePacket,
3 RpcRecv, RpcResult, RpcSend,
4};
5use alloy_transport::{BoxTransport, IntoBoxTransport, RpcFut, TransportError, TransportResult};
6use futures::FutureExt;
7use serde_json::value::RawValue;
8use std::{
9 fmt,
10 future::Future,
11 marker::PhantomData,
12 pin::Pin,
13 task::{self, ready, Poll::Ready},
14};
15use tower::Service;
16
17#[must_use = "futures do nothing unless you `.await` or poll them"]
19#[pin_project::pin_project(project = CallStateProj)]
20enum CallState<Params>
21where
22 Params: RpcSend,
23{
24 Prepared {
25 request: Option<Request<Params>>,
26 connection: BoxTransport,
27 },
28 AwaitingResponse {
29 #[pin]
30 fut: <BoxTransport as Service<RequestPacket>>::Future,
31 },
32 Complete,
33}
34
35impl<Params> Clone for CallState<Params>
36where
37 Params: RpcSend,
38{
39 fn clone(&self) -> Self {
40 match self {
41 Self::Prepared { request, connection } => {
42 Self::Prepared { request: request.clone(), connection: connection.clone() }
43 }
44 _ => panic!("cloned after dispatch"),
45 }
46 }
47}
48
49impl<Params> fmt::Debug for CallState<Params>
50where
51 Params: RpcSend,
52{
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 f.write_str(match self {
55 Self::Prepared { .. } => "Prepared",
56 Self::AwaitingResponse { .. } => "AwaitingResponse",
57 Self::Complete => "Complete",
58 })
59 }
60}
61
62impl<Params> Future for CallState<Params>
63where
64 Params: RpcSend,
65{
66 type Output = TransportResult<Box<RawValue>>;
67
68 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
69 loop {
70 match self.as_mut().project() {
71 CallStateProj::Prepared { connection, request } => {
72 if let Err(e) =
73 task::ready!(Service::<RequestPacket>::poll_ready(connection, cx))
74 {
75 self.set(Self::Complete);
76 return Ready(RpcResult::Err(e));
77 }
78
79 let request = request.take().expect("no request");
80 if tracing::enabled!(tracing::Level::TRACE) {
81 trace!(?request, "sending request");
82 } else {
83 debug!(method=%request.meta.method, id=%request.meta.id, "sending request");
84 }
85 let request = request.serialize();
86 let fut = match request {
87 Ok(request) => {
88 trace!(request=%request.serialized(), "serialized request");
89 connection.call(request.into())
90 }
91 Err(err) => {
92 trace!(?err, "failed to serialize request");
93 self.set(Self::Complete);
94 return Ready(RpcResult::Err(TransportError::ser_err(err)));
95 }
96 };
97 self.set(Self::AwaitingResponse { fut });
98 }
99 CallStateProj::AwaitingResponse { fut } => {
100 let res = match task::ready!(fut.poll(cx)) {
101 Ok(ResponsePacket::Single(res)) => Ready(transform_response(res)),
102 Err(e) => Ready(RpcResult::Err(e)),
103 _ => panic!("received batch response from single request"),
104 };
105 self.set(Self::Complete);
106 return res;
107 }
108 CallStateProj::Complete => {
109 panic!("Polled after completion");
110 }
111 }
112 }
113 }
114}
115
116#[must_use = "futures do nothing unless you `.await` or poll them"]
135#[pin_project::pin_project]
136#[derive(Clone)]
137pub struct RpcCall<Params, Resp, Output = Resp, Map = fn(Resp) -> Output>
138where
139 Params: RpcSend,
140 Map: FnOnce(Resp) -> Output,
141{
142 #[pin]
143 state: CallState<Params>,
144 map: Option<Map>,
145 _pd: core::marker::PhantomData<fn() -> (Resp, Output)>,
146}
147
148impl<Params, Resp, Output, Map> core::fmt::Debug for RpcCall<Params, Resp, Output, Map>
149where
150 Params: RpcSend,
151 Map: FnOnce(Resp) -> Output,
152{
153 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
154 f.debug_struct("RpcCall").field("state", &self.state).finish()
155 }
156}
157
158impl<Params, Resp> RpcCall<Params, Resp>
159where
160 Params: RpcSend,
161{
162 #[doc(hidden)]
163 pub fn new(req: Request<Params>, connection: impl IntoBoxTransport) -> Self {
164 Self {
165 state: CallState::Prepared {
166 request: Some(req),
167 connection: connection.into_box_transport(),
168 },
169 map: Some(std::convert::identity),
170 _pd: PhantomData,
171 }
172 }
173}
174
175impl<Params, Resp, Output, Map> RpcCall<Params, Resp, Output, Map>
176where
177 Params: RpcSend,
178 Map: FnOnce(Resp) -> Output,
179{
180 pub fn map_resp<NewOutput, NewMap>(
192 self,
193 map: NewMap,
194 ) -> RpcCall<Params, Resp, NewOutput, NewMap>
195 where
196 NewMap: FnOnce(Resp) -> NewOutput,
197 {
198 RpcCall { state: self.state, map: Some(map), _pd: PhantomData }
199 }
200
201 pub fn is_subscription(&self) -> bool {
207 self.request().meta.is_subscription()
208 }
209
210 pub fn set_is_subscription(&mut self) {
217 self.request_mut().meta.set_is_subscription();
218 }
219
220 pub fn set_subscription_status(&mut self, status: bool) {
222 self.request_mut().meta.set_subscription_status(status);
223 }
224
225 pub fn params(&mut self) -> &mut Params {
234 &mut self.request_mut().params
235 }
236
237 pub fn request(&self) -> &Request<Params> {
243 let CallState::Prepared { request, .. } = &self.state else {
244 panic!("Cannot get request after request has been sent");
245 };
246 request.as_ref().expect("no request in prepared")
247 }
248
249 pub fn method(&self) -> &str {
251 &self.request().meta.method
252 }
253
254 pub fn request_mut(&mut self) -> &mut Request<Params> {
260 let CallState::Prepared { request, .. } = &mut self.state else {
261 panic!("Cannot get request after request has been sent");
262 };
263 request.as_mut().expect("no request in prepared")
264 }
265
266 pub fn map_params<NewParams: RpcSend>(
268 self,
269 map: impl Fn(Params) -> NewParams,
270 ) -> RpcCall<NewParams, Resp, Output, Map> {
271 let CallState::Prepared { request, connection } = self.state else {
272 panic!("Cannot get request after request has been sent");
273 };
274 let request = request.expect("no request in prepared").map_params(map);
275 RpcCall {
276 state: CallState::Prepared { request: Some(request), connection },
277 map: self.map,
278 _pd: PhantomData,
279 }
280 }
281
282 pub fn map_meta(self, f: impl FnOnce(RequestMeta) -> RequestMeta) -> Self {
284 let CallState::Prepared { request, connection } = self.state else {
285 panic!("Cannot get request after request has been sent");
286 };
287 let request = request.expect("no request in prepared").map_meta(f);
288 Self {
289 state: CallState::Prepared { request: Some(request), connection },
290 map: self.map,
291 _pd: PhantomData,
292 }
293 }
294}
295
296impl<Params, Resp, Output, Map> RpcCall<&Params, Resp, Output, Map>
297where
298 Params: RpcSend + ToOwned,
299 Params::Owned: RpcSend,
300 Map: FnOnce(Resp) -> Output,
301{
302 pub fn into_owned_params(self) -> RpcCall<Params::Owned, Resp, Output, Map> {
308 let CallState::Prepared { request, connection } = self.state else {
309 panic!("Cannot get params after request has been sent");
310 };
311 let request = request.expect("no request in prepared").into_owned_params();
312
313 RpcCall {
314 state: CallState::Prepared { request: Some(request), connection },
315 map: self.map,
316 _pd: PhantomData,
317 }
318 }
319}
320
321impl<'a, Params, Resp, Output, Map> RpcCall<Params, Resp, Output, Map>
322where
323 Params: RpcSend + 'a,
324 Resp: RpcRecv,
325 Output: 'static,
326 Map: FnOnce(Resp) -> Output + Send + 'a,
327{
328 pub fn boxed(self) -> RpcFut<'a, Output> {
330 Box::pin(self)
331 }
332}
333
334impl<Params, Resp, Output, Map> Future for RpcCall<Params, Resp, Output, Map>
335where
336 Params: RpcSend,
337 Resp: RpcRecv,
338 Output: 'static,
339 Map: FnOnce(Resp) -> Output,
340{
341 type Output = TransportResult<Output>;
342
343 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
344 let this = self.get_mut();
345 let resp = try_deserialize_ok(ready!(this.state.poll_unpin(cx)));
346 Ready(resp.map(this.map.take().expect("polled after completion")))
347 }
348}