Skip to main content

alloy_rpc_client/
client.rs

1use crate::{poller::PollerBuilder, BatchRequest, ClientBuilder, RpcCall};
2use alloy_json_rpc::{Id, Request, RpcRecv, RpcSend};
3use alloy_transport::{mock::Asserter, BoxTransport, IntoBoxTransport};
4use std::{
5    borrow::Cow,
6    ops::Deref,
7    sync::{
8        atomic::{AtomicU64, Ordering},
9        Arc, Weak,
10    },
11    time::Duration,
12};
13use tower::{layer::util::Identity, ServiceBuilder};
14
15/// An [`RpcClient`] in a [`Weak`] reference.
16pub type WeakClient = Weak<RpcClientInner>;
17
18/// A borrowed [`RpcClient`].
19pub type ClientRef<'a> = &'a RpcClientInner;
20
21/// Parameter type of a JSON-RPC request with no parameters.
22pub type NoParams = [(); 0];
23
24#[cfg(feature = "pubsub")]
25type MaybePubsub = Option<alloy_pubsub::PubSubFrontend>;
26
27/// A JSON-RPC client.
28///
29/// [`RpcClient`] should never be instantiated directly. Instead, use
30/// [`ClientBuilder`].
31///
32/// [`ClientBuilder`]: crate::ClientBuilder
33#[derive(Debug)]
34pub struct RpcClient(Arc<RpcClientInner>);
35
36impl Clone for RpcClient {
37    fn clone(&self) -> Self {
38        Self(Arc::clone(&self.0))
39    }
40}
41
42impl RpcClient {
43    /// Create a new [`ClientBuilder`].
44    pub const fn builder() -> ClientBuilder<Identity> {
45        ClientBuilder { builder: ServiceBuilder::new() }
46    }
47}
48
49impl RpcClient {
50    /// Creates a new [`RpcClient`] with the given transport.
51    pub fn new(t: impl IntoBoxTransport, is_local: bool) -> Self {
52        Self::new_maybe_pubsub(
53            t,
54            is_local,
55            #[cfg(feature = "pubsub")]
56            None,
57        )
58    }
59
60    /// Create a new [`RpcClient`] with a transport that returns mocked responses from the given
61    /// [`Asserter`].
62    pub fn mocked(asserter: Asserter) -> Self {
63        Self::new(alloy_transport::mock::MockTransport::new(asserter), true)
64    }
65
66    /// Create a new [`RpcClient`] with an HTTP transport.
67    #[cfg(feature = "reqwest")]
68    pub fn new_http(url: reqwest::Url) -> Self {
69        let http = alloy_transport_http::Http::new(url);
70        let is_local = http.guess_local();
71        Self::new(http, is_local)
72    }
73
74    /// Create a new [`RpcClient`] with an HTTP transport using a pre-built [`reqwest::Client`].
75    #[cfg(feature = "reqwest")]
76    pub fn new_http_with_client(client: reqwest::Client, url: reqwest::Url) -> Self {
77        let http = alloy_transport_http::Http::with_client(client, url);
78        let is_local = http.guess_local();
79        Self::new(http, is_local)
80    }
81
82    /// Creates a new [`RpcClient`] with the given transport and a `MaybePubsub`.
83    fn new_maybe_pubsub(
84        t: impl IntoBoxTransport,
85        is_local: bool,
86        #[cfg(feature = "pubsub")] pubsub: MaybePubsub,
87    ) -> Self {
88        Self(Arc::new(RpcClientInner::new_maybe_pubsub(
89            t,
90            is_local,
91            #[cfg(feature = "pubsub")]
92            pubsub,
93        )))
94    }
95
96    /// Creates the [`RpcClient`] with the `main_transport` (ipc, ws, http) and a `layer` closure.
97    ///
98    /// The `layer` fn is intended to be [`tower::ServiceBuilder::service`] that layers the
99    /// transport services. The `main_transport` is expected to the type that actually emits the
100    /// request object: `PubSubFrontend`. This exists so that we can intercept the
101    /// `PubSubFrontend` which we need for [`RpcClientInner::pubsub_frontend`].
102    ///
103    /// This workaround exists because due to how [`tower::ServiceBuilder::service`] collapses into
104    /// a [`BoxTransport`] we wouldn't be obtain the `MaybePubsub` by downcasting the layered
105    /// `transport`.
106    pub(crate) fn new_layered<F, T, R>(is_local: bool, main_transport: T, layer: F) -> Self
107    where
108        F: FnOnce(T) -> R,
109        T: IntoBoxTransport,
110        R: IntoBoxTransport,
111    {
112        #[cfg(feature = "pubsub")]
113        {
114            let t = main_transport.clone().into_box_transport();
115            let maybe_pubsub = t.as_any().downcast_ref::<alloy_pubsub::PubSubFrontend>().cloned();
116            Self::new_maybe_pubsub(layer(main_transport), is_local, maybe_pubsub)
117        }
118
119        #[cfg(not(feature = "pubsub"))]
120        Self::new(layer(main_transport), is_local)
121    }
122
123    /// Creates a new [`RpcClient`] with the given inner client.
124    pub fn from_inner(inner: RpcClientInner) -> Self {
125        Self(Arc::new(inner))
126    }
127
128    /// Get a reference to the client.
129    pub const fn inner(&self) -> &Arc<RpcClientInner> {
130        &self.0
131    }
132
133    /// Convert the client into its inner type.
134    pub fn into_inner(self) -> Arc<RpcClientInner> {
135        self.0
136    }
137
138    /// Get a [`Weak`] reference to the client.
139    pub fn get_weak(&self) -> WeakClient {
140        Arc::downgrade(&self.0)
141    }
142
143    /// Borrow the client.
144    pub fn get_ref(&self) -> ClientRef<'_> {
145        &self.0
146    }
147
148    /// Sets the poll interval for the client in milliseconds.
149    ///
150    /// Note: `RpcClient` internally wraps an `Arc<RpcClientInner>`, so updating the poll interval
151    /// changes it for all `RpcClient` handles that share the same inner client.
152    pub fn with_poll_interval(self, poll_interval: Duration) -> Self {
153        self.inner().set_poll_interval(poll_interval);
154        self
155    }
156
157    /// Build a poller that polls a method with the given parameters.
158    ///
159    /// See [`PollerBuilder`] for examples and more details.
160    pub fn prepare_static_poller<Params, Resp>(
161        &self,
162        method: impl Into<Cow<'static, str>>,
163        params: Params,
164    ) -> PollerBuilder<Params, Resp>
165    where
166        Params: RpcSend + 'static,
167        Resp: RpcRecv + Clone,
168    {
169        PollerBuilder::new(self.get_weak(), method, params)
170    }
171
172    /// Create a new [`BatchRequest`] builder.
173    #[inline]
174    pub fn new_batch(&self) -> BatchRequest<'_> {
175        BatchRequest::new(&self.0)
176    }
177}
178
179impl Deref for RpcClient {
180    type Target = RpcClientInner;
181
182    #[inline]
183    fn deref(&self) -> &Self::Target {
184        &self.0
185    }
186}
187
188/// A JSON-RPC client.
189///
190/// This struct manages a [`BoxTransport`] and a request ID counter. It is used to
191/// build [`RpcCall`] and [`BatchRequest`] objects. The client delegates
192/// transport access to the calls.
193///
194/// ### Note
195///
196/// IDs are allocated sequentially, starting at 0. IDs are reserved via
197/// [`RpcClientInner::next_id`]. Note that allocated IDs may not be used. There
198/// is no guarantee that a prepared [`RpcCall`] will be sent, or that a sent
199/// call will receive a response.
200#[derive(Debug)]
201pub struct RpcClientInner {
202    /// The underlying transport.
203    pub(crate) transport: BoxTransport,
204    /// Stores a handle to the PubSub service if pubsub.
205    ///
206    /// We store this _transport_ because if built through the [`ClientBuilder`] with an additional
207    /// layer the actual transport can be an arbitrary type and we would be unable to obtain the
208    /// `PubSubFrontend` by downcasting the `transport`. For example
209    /// `RetryTransport<PubSubFrontend>`.
210    #[cfg(feature = "pubsub")]
211    pub(crate) pubsub: MaybePubsub,
212    /// `true` if the transport is local.
213    pub(crate) is_local: bool,
214    /// The next request ID to use.
215    pub(crate) id: AtomicU64,
216    /// The poll interval for the client in milliseconds.
217    pub(crate) poll_interval: AtomicU64,
218}
219
220impl RpcClientInner {
221    /// Create a new [`RpcClient`] with the given transport.
222    ///
223    /// Note: Sets the poll interval to 250ms for local transports and 7s for remote transports by
224    /// default.
225    #[inline]
226    pub fn new(t: impl IntoBoxTransport, is_local: bool) -> Self {
227        Self {
228            transport: t.into_box_transport(),
229            #[cfg(feature = "pubsub")]
230            pubsub: None,
231            is_local,
232            id: AtomicU64::new(0),
233            poll_interval: if is_local { AtomicU64::new(250) } else { AtomicU64::new(7000) },
234        }
235    }
236
237    /// Create a new [`RpcClient`] with the given transport and an optional handle to the
238    /// `PubSubFrontend`.
239    pub(crate) fn new_maybe_pubsub(
240        t: impl IntoBoxTransport,
241        is_local: bool,
242        #[cfg(feature = "pubsub")] pubsub: MaybePubsub,
243    ) -> Self {
244        Self {
245            #[cfg(feature = "pubsub")]
246            pubsub,
247            ..Self::new(t, is_local)
248        }
249    }
250
251    /// Sets the starting ID for the client.
252    #[inline]
253    pub fn with_id(self, id: u64) -> Self {
254        Self { id: AtomicU64::new(id), ..self }
255    }
256
257    /// Returns the default poll interval (milliseconds) for the client.
258    pub fn poll_interval(&self) -> Duration {
259        Duration::from_millis(self.poll_interval.load(Ordering::Relaxed))
260    }
261
262    /// Set the poll interval for the client in milliseconds. Default:
263    /// 7s for remote and 250ms for local transports.
264    pub fn set_poll_interval(&self, poll_interval: Duration) {
265        self.poll_interval.store(poll_interval.as_millis() as u64, Ordering::Relaxed);
266    }
267
268    /// Returns a reference to the underlying transport.
269    #[inline]
270    pub const fn transport(&self) -> &BoxTransport {
271        &self.transport
272    }
273
274    /// Returns a mutable reference to the underlying transport.
275    #[inline]
276    pub const fn transport_mut(&mut self) -> &mut BoxTransport {
277        &mut self.transport
278    }
279
280    /// Consumes the client and returns the underlying transport.
281    #[inline]
282    pub fn into_transport(self) -> BoxTransport {
283        self.transport
284    }
285
286    /// Returns a reference to the pubsub frontend if the transport supports it.
287    #[cfg(feature = "pubsub")]
288    #[inline]
289    #[track_caller]
290    pub fn pubsub_frontend(&self) -> Option<&alloy_pubsub::PubSubFrontend> {
291        if let Some(pubsub) = &self.pubsub {
292            return Some(pubsub);
293        }
294        self.transport.as_any().downcast_ref::<alloy_pubsub::PubSubFrontend>()
295    }
296
297    /// Returns a reference to the pubsub frontend if the transport supports it.
298    ///
299    /// # Panics
300    ///
301    /// Panics if the transport does not support pubsub.
302    #[cfg(feature = "pubsub")]
303    #[inline]
304    #[track_caller]
305    pub fn expect_pubsub_frontend(&self) -> &alloy_pubsub::PubSubFrontend {
306        self.pubsub_frontend().expect("called pubsub_frontend on a non-pubsub transport")
307    }
308
309    /// Build a `JsonRpcRequest` with the given method and params.
310    ///
311    /// This function reserves an ID for the request, however the request is not sent.
312    ///
313    /// To send a request, use [`RpcClientInner::request`] and await the returned [`RpcCall`].
314    #[inline]
315    pub fn make_request<Params: RpcSend>(
316        &self,
317        method: impl Into<Cow<'static, str>>,
318        params: Params,
319    ) -> Request<Params> {
320        Request::new(method, self.next_id(), params)
321    }
322
323    /// `true` if the client believes the transport is local.
324    ///
325    /// This can be used to optimize remote API usage, or to change program
326    /// behavior on local endpoints. When the client is instantiated by parsing
327    /// a URL or other external input, this value is set on a best-efforts
328    /// basis and may be incorrect.
329    #[inline]
330    pub const fn is_local(&self) -> bool {
331        self.is_local
332    }
333
334    /// Set the `is_local` flag.
335    #[inline]
336    pub const fn set_local(&mut self, is_local: bool) {
337        self.is_local = is_local;
338    }
339
340    /// Reserve a request ID value. This is used to generate request IDs.
341    #[inline]
342    fn increment_id(&self) -> u64 {
343        self.id.fetch_add(1, Ordering::Relaxed)
344    }
345
346    /// Reserve a request ID u64.
347    #[inline]
348    pub fn next_id(&self) -> Id {
349        self.increment_id().into()
350    }
351
352    /// Prepares an [`RpcCall`].
353    ///
354    /// This function reserves an ID for the request, however the request is not sent.
355    /// To send a request, await the returned [`RpcCall`].
356    ///
357    /// # Note
358    ///
359    /// Serialization is done lazily. It will not be performed until the call is awaited.
360    /// This means that if a serializer error occurs, it will not be caught until the call is
361    /// awaited.
362    #[doc(alias = "prepare")]
363    pub fn request<Params: RpcSend, Resp: RpcRecv>(
364        &self,
365        method: impl Into<Cow<'static, str>>,
366        params: Params,
367    ) -> RpcCall<Params, Resp> {
368        let request = self.make_request(method, params);
369        RpcCall::new(request, self.transport.clone())
370    }
371
372    /// Prepares an [`RpcCall`] with no parameters.
373    ///
374    /// See [`request`](Self::request) for more details.
375    pub fn request_noparams<Resp: RpcRecv>(
376        &self,
377        method: impl Into<Cow<'static, str>>,
378    ) -> RpcCall<NoParams, Resp> {
379        self.request(method, [])
380    }
381}
382
383#[cfg(feature = "pubsub")]
384mod pubsub_impl {
385    use super::*;
386    use alloy_pubsub::{PubSubConnect, RawSubscription, Subscription};
387    use alloy_transport::TransportResult;
388
389    impl RpcClientInner {
390        /// Get a [`RawSubscription`] for the given subscription ID.
391        ///
392        /// # Panics
393        ///
394        /// Panics if the transport does not support pubsub.
395        pub async fn get_raw_subscription(&self, id: alloy_primitives::B256) -> RawSubscription {
396            self.expect_pubsub_frontend().get_subscription(id).await.unwrap()
397        }
398
399        /// Get a [`Subscription`] for the given subscription ID.
400        ///
401        /// # Panics
402        ///
403        /// Panics if the transport does not support pubsub.
404        pub async fn get_subscription<T: serde::de::DeserializeOwned>(
405            &self,
406            id: alloy_primitives::B256,
407        ) -> Subscription<T> {
408            Subscription::from(self.get_raw_subscription(id).await)
409        }
410    }
411
412    impl RpcClient {
413        /// Connect to a transport via a [`PubSubConnect`] implementer.
414        pub async fn connect_pubsub<C: PubSubConnect>(connect: C) -> TransportResult<Self> {
415            ClientBuilder::default().pubsub(connect).await
416        }
417
418        /// Get the currently configured channel size. This is the number of items
419        /// to buffer in new subscription channels. Defaults to 16. See
420        /// [`tokio::sync::broadcast`] for a description of relevant
421        /// behavior.
422        ///
423        /// [`tokio::sync::broadcast`]: https://docs.rs/tokio/latest/tokio/sync/broadcast/index.html
424        ///
425        /// # Panics
426        ///
427        /// Panics if the transport does not support pubsub.
428        #[track_caller]
429        pub fn channel_size(&self) -> usize {
430            self.expect_pubsub_frontend().channel_size()
431        }
432
433        /// Set the channel size.
434        ///
435        /// # Panics
436        ///
437        /// Panics if the transport does not support pubsub.
438        #[track_caller]
439        pub fn set_channel_size(&self, size: usize) {
440            self.expect_pubsub_frontend().set_channel_size(size)
441        }
442    }
443}
444
445#[cfg(test)]
446mod tests {
447    use super::*;
448    use similar_asserts::assert_eq;
449
450    #[test]
451    fn test_client_with_poll_interval() {
452        let poll_interval = Duration::from_millis(5_000);
453        let client = RpcClient::new_http(reqwest::Url::parse("http://localhost").unwrap())
454            .with_poll_interval(poll_interval);
455        assert_eq!(client.poll_interval(), poll_interval);
456    }
457}