alloy_rpc_client/client.rs
1use crate::{poller::PollerBuilder, BatchRequest, ClientBuilder, RpcCall};
2use alloy_json_rpc::{Id, Request, RpcRecv, RpcSend};
3use alloy_transport::{mock::Asserter, BoxTransport, IntoBoxTransport};
4use std::{
5 borrow::Cow,
6 ops::Deref,
7 sync::{
8 atomic::{AtomicU64, Ordering},
9 Arc, Weak,
10 },
11 time::Duration,
12};
13use tower::{layer::util::Identity, ServiceBuilder};
14
15/// An [`RpcClient`] in a [`Weak`] reference.
16pub type WeakClient = Weak<RpcClientInner>;
17
18/// A borrowed [`RpcClient`].
19pub type ClientRef<'a> = &'a RpcClientInner;
20
21/// Parameter type of a JSON-RPC request with no parameters.
22pub type NoParams = [(); 0];
23
24#[cfg(feature = "pubsub")]
25type MaybePubsub = Option<alloy_pubsub::PubSubFrontend>;
26
27/// A JSON-RPC client.
28///
29/// [`RpcClient`] should never be instantiated directly. Instead, use
30/// [`ClientBuilder`].
31///
32/// [`ClientBuilder`]: crate::ClientBuilder
33#[derive(Debug)]
34pub struct RpcClient(Arc<RpcClientInner>);
35
36impl Clone for RpcClient {
37 fn clone(&self) -> Self {
38 Self(Arc::clone(&self.0))
39 }
40}
41
42impl RpcClient {
43 /// Create a new [`ClientBuilder`].
44 pub const fn builder() -> ClientBuilder<Identity> {
45 ClientBuilder { builder: ServiceBuilder::new() }
46 }
47}
48
49impl RpcClient {
50 /// Creates a new [`RpcClient`] with the given transport.
51 pub fn new(t: impl IntoBoxTransport, is_local: bool) -> Self {
52 Self::new_maybe_pubsub(
53 t,
54 is_local,
55 #[cfg(feature = "pubsub")]
56 None,
57 )
58 }
59
60 /// Create a new [`RpcClient`] with a transport that returns mocked responses from the given
61 /// [`Asserter`].
62 pub fn mocked(asserter: Asserter) -> Self {
63 Self::new(alloy_transport::mock::MockTransport::new(asserter), true)
64 }
65
66 /// Create a new [`RpcClient`] with an HTTP transport.
67 #[cfg(feature = "reqwest")]
68 pub fn new_http(url: reqwest::Url) -> Self {
69 let http = alloy_transport_http::Http::new(url);
70 let is_local = http.guess_local();
71 Self::new(http, is_local)
72 }
73
74 /// Create a new [`RpcClient`] with an HTTP transport using a pre-built [`reqwest::Client`].
75 #[cfg(feature = "reqwest")]
76 pub fn new_http_with_client(client: reqwest::Client, url: reqwest::Url) -> Self {
77 let http = alloy_transport_http::Http::with_client(client, url);
78 let is_local = http.guess_local();
79 Self::new(http, is_local)
80 }
81
82 /// Creates a new [`RpcClient`] with the given transport and a `MaybePubsub`.
83 fn new_maybe_pubsub(
84 t: impl IntoBoxTransport,
85 is_local: bool,
86 #[cfg(feature = "pubsub")] pubsub: MaybePubsub,
87 ) -> Self {
88 Self(Arc::new(RpcClientInner::new_maybe_pubsub(
89 t,
90 is_local,
91 #[cfg(feature = "pubsub")]
92 pubsub,
93 )))
94 }
95
96 /// Creates the [`RpcClient`] with the `main_transport` (ipc, ws, http) and a `layer` closure.
97 ///
98 /// The `layer` fn is intended to be [`tower::ServiceBuilder::service`] that layers the
99 /// transport services. The `main_transport` is expected to the type that actually emits the
100 /// request object: `PubSubFrontend`. This exists so that we can intercept the
101 /// `PubSubFrontend` which we need for [`RpcClientInner::pubsub_frontend`].
102 ///
103 /// This workaround exists because due to how [`tower::ServiceBuilder::service`] collapses into
104 /// a [`BoxTransport`] we wouldn't be obtain the `MaybePubsub` by downcasting the layered
105 /// `transport`.
106 pub(crate) fn new_layered<F, T, R>(is_local: bool, main_transport: T, layer: F) -> Self
107 where
108 F: FnOnce(T) -> R,
109 T: IntoBoxTransport,
110 R: IntoBoxTransport,
111 {
112 #[cfg(feature = "pubsub")]
113 {
114 let t = main_transport.clone().into_box_transport();
115 let maybe_pubsub = t.as_any().downcast_ref::<alloy_pubsub::PubSubFrontend>().cloned();
116 Self::new_maybe_pubsub(layer(main_transport), is_local, maybe_pubsub)
117 }
118
119 #[cfg(not(feature = "pubsub"))]
120 Self::new(layer(main_transport), is_local)
121 }
122
123 /// Creates a new [`RpcClient`] with the given inner client.
124 pub fn from_inner(inner: RpcClientInner) -> Self {
125 Self(Arc::new(inner))
126 }
127
128 /// Get a reference to the client.
129 pub const fn inner(&self) -> &Arc<RpcClientInner> {
130 &self.0
131 }
132
133 /// Convert the client into its inner type.
134 pub fn into_inner(self) -> Arc<RpcClientInner> {
135 self.0
136 }
137
138 /// Get a [`Weak`] reference to the client.
139 pub fn get_weak(&self) -> WeakClient {
140 Arc::downgrade(&self.0)
141 }
142
143 /// Borrow the client.
144 pub fn get_ref(&self) -> ClientRef<'_> {
145 &self.0
146 }
147
148 /// Sets the poll interval for the client in milliseconds.
149 ///
150 /// Note: `RpcClient` internally wraps an `Arc<RpcClientInner>`, so updating the poll interval
151 /// changes it for all `RpcClient` handles that share the same inner client.
152 pub fn with_poll_interval(self, poll_interval: Duration) -> Self {
153 self.inner().set_poll_interval(poll_interval);
154 self
155 }
156
157 /// Build a poller that polls a method with the given parameters.
158 ///
159 /// See [`PollerBuilder`] for examples and more details.
160 pub fn prepare_static_poller<Params, Resp>(
161 &self,
162 method: impl Into<Cow<'static, str>>,
163 params: Params,
164 ) -> PollerBuilder<Params, Resp>
165 where
166 Params: RpcSend + 'static,
167 Resp: RpcRecv + Clone,
168 {
169 PollerBuilder::new(self.get_weak(), method, params)
170 }
171
172 /// Create a new [`BatchRequest`] builder.
173 #[inline]
174 pub fn new_batch(&self) -> BatchRequest<'_> {
175 BatchRequest::new(&self.0)
176 }
177}
178
179impl Deref for RpcClient {
180 type Target = RpcClientInner;
181
182 #[inline]
183 fn deref(&self) -> &Self::Target {
184 &self.0
185 }
186}
187
188/// A JSON-RPC client.
189///
190/// This struct manages a [`BoxTransport`] and a request ID counter. It is used to
191/// build [`RpcCall`] and [`BatchRequest`] objects. The client delegates
192/// transport access to the calls.
193///
194/// ### Note
195///
196/// IDs are allocated sequentially, starting at 0. IDs are reserved via
197/// [`RpcClientInner::next_id`]. Note that allocated IDs may not be used. There
198/// is no guarantee that a prepared [`RpcCall`] will be sent, or that a sent
199/// call will receive a response.
200#[derive(Debug)]
201pub struct RpcClientInner {
202 /// The underlying transport.
203 pub(crate) transport: BoxTransport,
204 /// Stores a handle to the PubSub service if pubsub.
205 ///
206 /// We store this _transport_ because if built through the [`ClientBuilder`] with an additional
207 /// layer the actual transport can be an arbitrary type and we would be unable to obtain the
208 /// `PubSubFrontend` by downcasting the `transport`. For example
209 /// `RetryTransport<PubSubFrontend>`.
210 #[cfg(feature = "pubsub")]
211 pub(crate) pubsub: MaybePubsub,
212 /// `true` if the transport is local.
213 pub(crate) is_local: bool,
214 /// The next request ID to use.
215 pub(crate) id: AtomicU64,
216 /// The poll interval for the client in milliseconds.
217 pub(crate) poll_interval: AtomicU64,
218}
219
220impl RpcClientInner {
221 /// Create a new [`RpcClient`] with the given transport.
222 ///
223 /// Note: Sets the poll interval to 250ms for local transports and 7s for remote transports by
224 /// default.
225 #[inline]
226 pub fn new(t: impl IntoBoxTransport, is_local: bool) -> Self {
227 Self {
228 transport: t.into_box_transport(),
229 #[cfg(feature = "pubsub")]
230 pubsub: None,
231 is_local,
232 id: AtomicU64::new(0),
233 poll_interval: if is_local { AtomicU64::new(250) } else { AtomicU64::new(7000) },
234 }
235 }
236
237 /// Create a new [`RpcClient`] with the given transport and an optional handle to the
238 /// `PubSubFrontend`.
239 pub(crate) fn new_maybe_pubsub(
240 t: impl IntoBoxTransport,
241 is_local: bool,
242 #[cfg(feature = "pubsub")] pubsub: MaybePubsub,
243 ) -> Self {
244 Self {
245 #[cfg(feature = "pubsub")]
246 pubsub,
247 ..Self::new(t, is_local)
248 }
249 }
250
251 /// Sets the starting ID for the client.
252 #[inline]
253 pub fn with_id(self, id: u64) -> Self {
254 Self { id: AtomicU64::new(id), ..self }
255 }
256
257 /// Returns the default poll interval (milliseconds) for the client.
258 pub fn poll_interval(&self) -> Duration {
259 Duration::from_millis(self.poll_interval.load(Ordering::Relaxed))
260 }
261
262 /// Set the poll interval for the client in milliseconds. Default:
263 /// 7s for remote and 250ms for local transports.
264 pub fn set_poll_interval(&self, poll_interval: Duration) {
265 self.poll_interval.store(poll_interval.as_millis() as u64, Ordering::Relaxed);
266 }
267
268 /// Returns a reference to the underlying transport.
269 #[inline]
270 pub const fn transport(&self) -> &BoxTransport {
271 &self.transport
272 }
273
274 /// Returns a mutable reference to the underlying transport.
275 #[inline]
276 pub const fn transport_mut(&mut self) -> &mut BoxTransport {
277 &mut self.transport
278 }
279
280 /// Consumes the client and returns the underlying transport.
281 #[inline]
282 pub fn into_transport(self) -> BoxTransport {
283 self.transport
284 }
285
286 /// Returns a reference to the pubsub frontend if the transport supports it.
287 #[cfg(feature = "pubsub")]
288 #[inline]
289 #[track_caller]
290 pub fn pubsub_frontend(&self) -> Option<&alloy_pubsub::PubSubFrontend> {
291 if let Some(pubsub) = &self.pubsub {
292 return Some(pubsub);
293 }
294 self.transport.as_any().downcast_ref::<alloy_pubsub::PubSubFrontend>()
295 }
296
297 /// Returns a reference to the pubsub frontend if the transport supports it.
298 ///
299 /// # Panics
300 ///
301 /// Panics if the transport does not support pubsub.
302 #[cfg(feature = "pubsub")]
303 #[inline]
304 #[track_caller]
305 pub fn expect_pubsub_frontend(&self) -> &alloy_pubsub::PubSubFrontend {
306 self.pubsub_frontend().expect("called pubsub_frontend on a non-pubsub transport")
307 }
308
309 /// Build a `JsonRpcRequest` with the given method and params.
310 ///
311 /// This function reserves an ID for the request, however the request is not sent.
312 ///
313 /// To send a request, use [`RpcClientInner::request`] and await the returned [`RpcCall`].
314 #[inline]
315 pub fn make_request<Params: RpcSend>(
316 &self,
317 method: impl Into<Cow<'static, str>>,
318 params: Params,
319 ) -> Request<Params> {
320 Request::new(method, self.next_id(), params)
321 }
322
323 /// `true` if the client believes the transport is local.
324 ///
325 /// This can be used to optimize remote API usage, or to change program
326 /// behavior on local endpoints. When the client is instantiated by parsing
327 /// a URL or other external input, this value is set on a best-efforts
328 /// basis and may be incorrect.
329 #[inline]
330 pub const fn is_local(&self) -> bool {
331 self.is_local
332 }
333
334 /// Set the `is_local` flag.
335 #[inline]
336 pub const fn set_local(&mut self, is_local: bool) {
337 self.is_local = is_local;
338 }
339
340 /// Reserve a request ID value. This is used to generate request IDs.
341 #[inline]
342 fn increment_id(&self) -> u64 {
343 self.id.fetch_add(1, Ordering::Relaxed)
344 }
345
346 /// Reserve a request ID u64.
347 #[inline]
348 pub fn next_id(&self) -> Id {
349 self.increment_id().into()
350 }
351
352 /// Prepares an [`RpcCall`].
353 ///
354 /// This function reserves an ID for the request, however the request is not sent.
355 /// To send a request, await the returned [`RpcCall`].
356 ///
357 /// # Note
358 ///
359 /// Serialization is done lazily. It will not be performed until the call is awaited.
360 /// This means that if a serializer error occurs, it will not be caught until the call is
361 /// awaited.
362 #[doc(alias = "prepare")]
363 pub fn request<Params: RpcSend, Resp: RpcRecv>(
364 &self,
365 method: impl Into<Cow<'static, str>>,
366 params: Params,
367 ) -> RpcCall<Params, Resp> {
368 let request = self.make_request(method, params);
369 RpcCall::new(request, self.transport.clone())
370 }
371
372 /// Prepares an [`RpcCall`] with no parameters.
373 ///
374 /// See [`request`](Self::request) for more details.
375 pub fn request_noparams<Resp: RpcRecv>(
376 &self,
377 method: impl Into<Cow<'static, str>>,
378 ) -> RpcCall<NoParams, Resp> {
379 self.request(method, [])
380 }
381}
382
383#[cfg(feature = "pubsub")]
384mod pubsub_impl {
385 use super::*;
386 use alloy_pubsub::{PubSubConnect, RawSubscription, Subscription};
387 use alloy_transport::TransportResult;
388
389 impl RpcClientInner {
390 /// Get a [`RawSubscription`] for the given subscription ID.
391 ///
392 /// # Panics
393 ///
394 /// Panics if the transport does not support pubsub.
395 pub async fn get_raw_subscription(&self, id: alloy_primitives::B256) -> RawSubscription {
396 self.expect_pubsub_frontend().get_subscription(id).await.unwrap()
397 }
398
399 /// Get a [`Subscription`] for the given subscription ID.
400 ///
401 /// # Panics
402 ///
403 /// Panics if the transport does not support pubsub.
404 pub async fn get_subscription<T: serde::de::DeserializeOwned>(
405 &self,
406 id: alloy_primitives::B256,
407 ) -> Subscription<T> {
408 Subscription::from(self.get_raw_subscription(id).await)
409 }
410 }
411
412 impl RpcClient {
413 /// Connect to a transport via a [`PubSubConnect`] implementer.
414 pub async fn connect_pubsub<C: PubSubConnect>(connect: C) -> TransportResult<Self> {
415 ClientBuilder::default().pubsub(connect).await
416 }
417
418 /// Get the currently configured channel size. This is the number of items
419 /// to buffer in new subscription channels. Defaults to 16. See
420 /// [`tokio::sync::broadcast`] for a description of relevant
421 /// behavior.
422 ///
423 /// [`tokio::sync::broadcast`]: https://docs.rs/tokio/latest/tokio/sync/broadcast/index.html
424 ///
425 /// # Panics
426 ///
427 /// Panics if the transport does not support pubsub.
428 #[track_caller]
429 pub fn channel_size(&self) -> usize {
430 self.expect_pubsub_frontend().channel_size()
431 }
432
433 /// Set the channel size.
434 ///
435 /// # Panics
436 ///
437 /// Panics if the transport does not support pubsub.
438 #[track_caller]
439 pub fn set_channel_size(&self, size: usize) {
440 self.expect_pubsub_frontend().set_channel_size(size)
441 }
442 }
443}
444
445#[cfg(test)]
446mod tests {
447 use super::*;
448 use similar_asserts::assert_eq;
449
450 #[test]
451 fn test_client_with_poll_interval() {
452 let poll_interval = Duration::from_millis(5_000);
453 let client = RpcClient::new_http(reqwest::Url::parse("http://localhost").unwrap())
454 .with_poll_interval(poll_interval);
455 assert_eq!(client.poll_interval(), poll_interval);
456 }
457}