alloy_rpc_client/
client.rs

1use crate::{poller::PollerBuilder, BatchRequest, ClientBuilder, RpcCall};
2use alloy_json_rpc::{Id, Request, RpcRecv, RpcSend};
3use alloy_transport::{mock::Asserter, BoxTransport, IntoBoxTransport};
4use std::{
5    borrow::Cow,
6    ops::Deref,
7    sync::{
8        atomic::{AtomicU64, Ordering},
9        Arc, Weak,
10    },
11    time::Duration,
12};
13use tower::{layer::util::Identity, ServiceBuilder};
14
15/// An [`RpcClient`] in a [`Weak`] reference.
16pub type WeakClient = Weak<RpcClientInner>;
17
18/// A borrowed [`RpcClient`].
19pub type ClientRef<'a> = &'a RpcClientInner;
20
21/// Parameter type of a JSON-RPC request with no parameters.
22pub type NoParams = [(); 0];
23
24#[cfg(feature = "pubsub")]
25type MaybePubsub = Option<alloy_pubsub::PubSubFrontend>;
26
27#[cfg(not(feature = "pubsub"))]
28type MaybePubsub = Option<()>;
29
30/// A JSON-RPC client.
31///
32/// [`RpcClient`] should never be instantiated directly. Instead, use
33/// [`ClientBuilder`].
34///
35/// [`ClientBuilder`]: crate::ClientBuilder
36#[derive(Debug)]
37pub struct RpcClient(Arc<RpcClientInner>);
38
39impl Clone for RpcClient {
40    fn clone(&self) -> Self {
41        Self(Arc::clone(&self.0))
42    }
43}
44
45impl RpcClient {
46    /// Create a new [`ClientBuilder`].
47    pub const fn builder() -> ClientBuilder<Identity> {
48        ClientBuilder { builder: ServiceBuilder::new() }
49    }
50}
51
52impl RpcClient {
53    /// Creates a new [`RpcClient`] with the given transport.
54    pub fn new(t: impl IntoBoxTransport, is_local: bool) -> Self {
55        Self::new_maybe_pubsub(t, is_local, None)
56    }
57
58    /// Create a new [`RpcClient`] with a transport that returns mocked responses from the given
59    /// [`Asserter`].
60    pub fn mocked(asserter: Asserter) -> Self {
61        Self::new(alloy_transport::mock::MockTransport::new(asserter), true)
62    }
63
64    /// Create a new [`RpcClient`] with an HTTP transport.
65    #[cfg(feature = "reqwest")]
66    pub fn new_http(url: reqwest::Url) -> Self {
67        let http = alloy_transport_http::Http::new(url);
68        let is_local = http.guess_local();
69        Self::new(http, is_local)
70    }
71
72    /// Creates a new [`RpcClient`] with the given transport and an optional [`MaybePubsub`].
73    pub(crate) fn new_maybe_pubsub(
74        t: impl IntoBoxTransport,
75        is_local: bool,
76        pubsub: MaybePubsub,
77    ) -> Self {
78        Self(Arc::new(RpcClientInner::new_maybe_pubsub(t, is_local, pubsub)))
79    }
80
81    /// Creates the [`RpcClient`] with the `main_transport` (ipc, ws, http) and a `layer` closure.
82    ///
83    /// The `layer` fn is intended to be [`tower::ServiceBuilder::service`] that layers the
84    /// transport services. The `main_transport` is expected to the type that actually emits the
85    /// request object: `PubSubFrontend`. This exists so that we can intercept the
86    /// `PubSubFrontend` which we need for [`RpcClientInner::pubsub_frontend`].
87    /// This workaround exists because due to how [`tower::ServiceBuilder::service`] collapses into
88    /// a [`BoxTransport`] we wouldn't be obtain the [`MaybePubsub`] by downcasting the layered
89    /// `transport`.
90    pub(crate) fn new_layered<F, T, R>(is_local: bool, main_transport: T, layer: F) -> Self
91    where
92        F: FnOnce(T) -> R,
93        T: IntoBoxTransport,
94        R: IntoBoxTransport,
95    {
96        #[cfg(feature = "pubsub")]
97        {
98            let t = main_transport.clone().into_box_transport();
99            let maybe_pubsub = t.as_any().downcast_ref::<alloy_pubsub::PubSubFrontend>().cloned();
100            Self::new_maybe_pubsub(layer(main_transport), is_local, maybe_pubsub)
101        }
102
103        #[cfg(not(feature = "pubsub"))]
104        Self::new(layer(main_transport), is_local)
105    }
106
107    /// Creates a new [`RpcClient`] with the given inner client.
108    pub fn from_inner(inner: RpcClientInner) -> Self {
109        Self(Arc::new(inner))
110    }
111
112    /// Get a reference to the client.
113    pub const fn inner(&self) -> &Arc<RpcClientInner> {
114        &self.0
115    }
116
117    /// Convert the client into its inner type.
118    pub fn into_inner(self) -> Arc<RpcClientInner> {
119        self.0
120    }
121
122    /// Get a [`Weak`] reference to the client.
123    pub fn get_weak(&self) -> WeakClient {
124        Arc::downgrade(&self.0)
125    }
126
127    /// Borrow the client.
128    pub fn get_ref(&self) -> ClientRef<'_> {
129        &self.0
130    }
131
132    /// Sets the poll interval for the client in milliseconds.
133    ///
134    /// Note: This will only set the poll interval for the client if it is the only reference to the
135    /// inner client. If the reference is held by many, then it will not update the poll interval.
136    pub fn with_poll_interval(self, poll_interval: Duration) -> Self {
137        self.inner().set_poll_interval(poll_interval);
138        self
139    }
140
141    /// Build a poller that polls a method with the given parameters.
142    ///
143    /// See [`PollerBuilder`] for examples and more details.
144    pub fn prepare_static_poller<Params, Resp>(
145        &self,
146        method: impl Into<Cow<'static, str>>,
147        params: Params,
148    ) -> PollerBuilder<Params, Resp>
149    where
150        Params: RpcSend + 'static,
151        Resp: RpcRecv + Clone,
152    {
153        PollerBuilder::new(self.get_weak(), method, params)
154    }
155
156    /// Boxes the transport.
157    #[deprecated(since = "0.9.0", note = "`RpcClient` is now always boxed")]
158    #[allow(clippy::missing_const_for_fn)]
159    pub fn boxed(self) -> Self {
160        self
161    }
162
163    /// Create a new [`BatchRequest`] builder.
164    #[inline]
165    pub fn new_batch(&self) -> BatchRequest<'_> {
166        BatchRequest::new(&self.0)
167    }
168}
169
170impl Deref for RpcClient {
171    type Target = RpcClientInner;
172
173    #[inline]
174    fn deref(&self) -> &Self::Target {
175        &self.0
176    }
177}
178
179/// A JSON-RPC client.
180///
181/// This struct manages a [`BoxTransport`] and a request ID counter. It is used to
182/// build [`RpcCall`] and [`BatchRequest`] objects. The client delegates
183/// transport access to the calls.
184///
185/// ### Note
186///
187/// IDs are allocated sequentially, starting at 0. IDs are reserved via
188/// [`RpcClientInner::next_id`]. Note that allocated IDs may not be used. There
189/// is no guarantee that a prepared [`RpcCall`] will be sent, or that a sent
190/// call will receive a response.
191#[derive(Debug)]
192pub struct RpcClientInner {
193    /// The underlying transport.
194    pub(crate) transport: BoxTransport,
195    /// Stores a handle to the PubSub service if pubsub.
196    ///
197    /// We store this _transport_ because if built through the [`ClientBuilder`] with an additional
198    /// layer the actual transport can be an arbitrary type and we would be unable to obtain the
199    /// `PubSubFrontend` by downcasting the `transport`. For example
200    /// `RetryTransport<PubSubFrontend>`.
201    #[allow(unused)]
202    pub(crate) pubsub: MaybePubsub,
203    /// `true` if the transport is local.
204    pub(crate) is_local: bool,
205    /// The next request ID to use.
206    pub(crate) id: AtomicU64,
207    /// The poll interval for the client in milliseconds.
208    pub(crate) poll_interval: AtomicU64,
209}
210
211impl RpcClientInner {
212    /// Create a new [`RpcClient`] with the given transport.
213    ///
214    /// Note: Sets the poll interval to 250ms for local transports and 7s for remote transports by
215    /// default.
216    #[inline]
217    pub fn new(t: impl IntoBoxTransport, is_local: bool) -> Self {
218        Self {
219            transport: t.into_box_transport(),
220            pubsub: None,
221            is_local,
222            id: AtomicU64::new(0),
223            poll_interval: if is_local { AtomicU64::new(250) } else { AtomicU64::new(7000) },
224        }
225    }
226
227    /// Create a new [`RpcClient`] with the given transport and an optional handle to the
228    /// `PubSubFrontend`.
229    pub(crate) fn new_maybe_pubsub(
230        t: impl IntoBoxTransport,
231        is_local: bool,
232        pubsub: MaybePubsub,
233    ) -> Self {
234        Self { pubsub, ..Self::new(t.into_box_transport(), is_local) }
235    }
236
237    /// Sets the starting ID for the client.
238    #[inline]
239    pub fn with_id(self, id: u64) -> Self {
240        Self { id: AtomicU64::new(id), ..self }
241    }
242
243    /// Returns the default poll interval (milliseconds) for the client.
244    pub fn poll_interval(&self) -> Duration {
245        Duration::from_millis(self.poll_interval.load(Ordering::Relaxed))
246    }
247
248    /// Set the poll interval for the client in milliseconds. Default:
249    /// 7s for remote and 250ms for local transports.
250    pub fn set_poll_interval(&self, poll_interval: Duration) {
251        self.poll_interval.store(poll_interval.as_millis() as u64, Ordering::Relaxed);
252    }
253
254    /// Returns a reference to the underlying transport.
255    #[inline]
256    pub const fn transport(&self) -> &BoxTransport {
257        &self.transport
258    }
259
260    /// Returns a mutable reference to the underlying transport.
261    #[inline]
262    pub fn transport_mut(&mut self) -> &mut BoxTransport {
263        &mut self.transport
264    }
265
266    /// Consumes the client and returns the underlying transport.
267    #[inline]
268    pub fn into_transport(self) -> BoxTransport {
269        self.transport
270    }
271
272    /// Returns a reference to the pubsub frontend if the transport supports it.
273    #[cfg(feature = "pubsub")]
274    #[inline]
275    #[track_caller]
276    pub fn pubsub_frontend(&self) -> Option<&alloy_pubsub::PubSubFrontend> {
277        if let Some(pubsub) = &self.pubsub {
278            return Some(pubsub);
279        }
280        self.transport.as_any().downcast_ref::<alloy_pubsub::PubSubFrontend>()
281    }
282
283    /// Returns a reference to the pubsub frontend if the transport supports it.
284    ///
285    /// # Panics
286    ///
287    /// Panics if the transport does not support pubsub.
288    #[cfg(feature = "pubsub")]
289    #[inline]
290    #[track_caller]
291    pub fn expect_pubsub_frontend(&self) -> &alloy_pubsub::PubSubFrontend {
292        self.pubsub_frontend().expect("called pubsub_frontend on a non-pubsub transport")
293    }
294
295    /// Build a `JsonRpcRequest` with the given method and params.
296    ///
297    /// This function reserves an ID for the request, however the request is not sent.
298    ///
299    /// To send a request, use [`RpcClientInner::request`] and await the returned [`RpcCall`].
300    #[inline]
301    pub fn make_request<Params: RpcSend>(
302        &self,
303        method: impl Into<Cow<'static, str>>,
304        params: Params,
305    ) -> Request<Params> {
306        Request::new(method, self.next_id(), params)
307    }
308
309    /// `true` if the client believes the transport is local.
310    ///
311    /// This can be used to optimize remote API usage, or to change program
312    /// behavior on local endpoints. When the client is instantiated by parsing
313    /// a URL or other external input, this value is set on a best-efforts
314    /// basis and may be incorrect.
315    #[inline]
316    pub const fn is_local(&self) -> bool {
317        self.is_local
318    }
319
320    /// Set the `is_local` flag.
321    #[inline]
322    pub fn set_local(&mut self, is_local: bool) {
323        self.is_local = is_local;
324    }
325
326    /// Reserve a request ID value. This is used to generate request IDs.
327    #[inline]
328    fn increment_id(&self) -> u64 {
329        self.id.fetch_add(1, Ordering::Relaxed)
330    }
331
332    /// Reserve a request ID u64.
333    #[inline]
334    pub fn next_id(&self) -> Id {
335        self.increment_id().into()
336    }
337
338    /// Prepares an [`RpcCall`].
339    ///
340    /// This function reserves an ID for the request, however the request is not sent.
341    /// To send a request, await the returned [`RpcCall`].
342    ///
343    /// # Note
344    ///
345    /// Serialization is done lazily. It will not be performed until the call is awaited.
346    /// This means that if a serializer error occurs, it will not be caught until the call is
347    /// awaited.
348    #[doc(alias = "prepare")]
349    pub fn request<Params: RpcSend, Resp: RpcRecv>(
350        &self,
351        method: impl Into<Cow<'static, str>>,
352        params: Params,
353    ) -> RpcCall<Params, Resp> {
354        let request = self.make_request(method, params);
355        RpcCall::new(request, self.transport.clone())
356    }
357
358    /// Prepares an [`RpcCall`] with no parameters.
359    ///
360    /// See [`request`](Self::request) for more details.
361    pub fn request_noparams<Resp: RpcRecv>(
362        &self,
363        method: impl Into<Cow<'static, str>>,
364    ) -> RpcCall<NoParams, Resp> {
365        self.request(method, [])
366    }
367
368    /// Type erase the service in the transport, allowing it to be used in a
369    /// generic context.
370    #[deprecated(since = "0.9.0", note = "`RpcClientInner` is now always boxed")]
371    #[allow(clippy::missing_const_for_fn)]
372    pub fn boxed(self) -> Self {
373        self
374    }
375}
376
377#[cfg(feature = "pubsub")]
378mod pubsub_impl {
379    use super::*;
380    use alloy_pubsub::{PubSubConnect, RawSubscription, Subscription};
381    use alloy_transport::TransportResult;
382
383    impl RpcClientInner {
384        /// Get a [`RawSubscription`] for the given subscription ID.
385        ///
386        /// # Panics
387        ///
388        /// Panics if the transport does not support pubsub.
389        pub async fn get_raw_subscription(&self, id: alloy_primitives::B256) -> RawSubscription {
390            self.expect_pubsub_frontend().get_subscription(id).await.unwrap()
391        }
392
393        /// Get a [`Subscription`] for the given subscription ID.
394        ///
395        /// # Panics
396        ///
397        /// Panics if the transport does not support pubsub.
398        pub async fn get_subscription<T: serde::de::DeserializeOwned>(
399            &self,
400            id: alloy_primitives::B256,
401        ) -> Subscription<T> {
402            Subscription::from(self.get_raw_subscription(id).await)
403        }
404    }
405
406    impl RpcClient {
407        /// Connect to a transport via a [`PubSubConnect`] implementor.
408        pub async fn connect_pubsub<C: PubSubConnect>(connect: C) -> TransportResult<Self> {
409            ClientBuilder::default().pubsub(connect).await
410        }
411
412        /// Get the currently configured channel size. This is the number of items
413        /// to buffer in new subscription channels. Defaults to 16. See
414        /// [`tokio::sync::broadcast`] for a description of relevant
415        /// behavior.
416        ///
417        /// [`tokio::sync::broadcast`]: https://docs.rs/tokio/latest/tokio/sync/broadcast/index.html
418        ///
419        /// # Panics
420        ///
421        /// Panics if the transport does not support pubsub.
422        #[track_caller]
423        pub fn channel_size(&self) -> usize {
424            self.expect_pubsub_frontend().channel_size()
425        }
426
427        /// Set the channel size.
428        ///
429        /// # Panics
430        ///
431        /// Panics if the transport does not support pubsub.
432        #[track_caller]
433        pub fn set_channel_size(&self, size: usize) {
434            self.expect_pubsub_frontend().set_channel_size(size)
435        }
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442    use similar_asserts::assert_eq;
443
444    #[test]
445    fn test_client_with_poll_interval() {
446        let poll_interval = Duration::from_millis(5_000);
447        let client = RpcClient::new_http(reqwest::Url::parse("http://localhost").unwrap())
448            .with_poll_interval(poll_interval);
449        assert_eq!(client.poll_interval(), poll_interval);
450    }
451}