1use crate::{poller::PollerBuilder, BatchRequest, ClientBuilder, RpcCall};
2use alloy_json_rpc::{Id, Request, RpcRecv, RpcSend};
3use alloy_transport::{mock::Asserter, BoxTransport, IntoBoxTransport};
4use std::{
5 borrow::Cow,
6 ops::Deref,
7 sync::{
8 atomic::{AtomicU64, Ordering},
9 Arc, Weak,
10 },
11 time::Duration,
12};
13use tower::{layer::util::Identity, ServiceBuilder};
14
15pub type WeakClient = Weak<RpcClientInner>;
17
18pub type ClientRef<'a> = &'a RpcClientInner;
20
21pub type NoParams = [(); 0];
23
24#[cfg(feature = "pubsub")]
25type MaybePubsub = Option<alloy_pubsub::PubSubFrontend>;
26
27#[cfg(not(feature = "pubsub"))]
28type MaybePubsub = Option<()>;
29
30#[derive(Debug)]
37pub struct RpcClient(Arc<RpcClientInner>);
38
39impl Clone for RpcClient {
40 fn clone(&self) -> Self {
41 Self(Arc::clone(&self.0))
42 }
43}
44
45impl RpcClient {
46 pub const fn builder() -> ClientBuilder<Identity> {
48 ClientBuilder { builder: ServiceBuilder::new() }
49 }
50}
51
52impl RpcClient {
53 pub fn new(t: impl IntoBoxTransport, is_local: bool) -> Self {
55 Self::new_maybe_pubsub(t, is_local, None)
56 }
57
58 pub fn mocked(asserter: Asserter) -> Self {
61 Self::new(alloy_transport::mock::MockTransport::new(asserter), true)
62 }
63
64 #[cfg(feature = "reqwest")]
66 pub fn new_http(url: reqwest::Url) -> Self {
67 let http = alloy_transport_http::Http::new(url);
68 let is_local = http.guess_local();
69 Self::new(http, is_local)
70 }
71
72 pub(crate) fn new_maybe_pubsub(
74 t: impl IntoBoxTransport,
75 is_local: bool,
76 pubsub: MaybePubsub,
77 ) -> Self {
78 Self(Arc::new(RpcClientInner::new_maybe_pubsub(t, is_local, pubsub)))
79 }
80
81 pub(crate) fn new_layered<F, T, R>(is_local: bool, main_transport: T, layer: F) -> Self
91 where
92 F: FnOnce(T) -> R,
93 T: IntoBoxTransport,
94 R: IntoBoxTransport,
95 {
96 #[cfg(feature = "pubsub")]
97 {
98 let t = main_transport.clone().into_box_transport();
99 let maybe_pubsub = t.as_any().downcast_ref::<alloy_pubsub::PubSubFrontend>().cloned();
100 Self::new_maybe_pubsub(layer(main_transport), is_local, maybe_pubsub)
101 }
102
103 #[cfg(not(feature = "pubsub"))]
104 Self::new(layer(main_transport), is_local)
105 }
106
107 pub fn from_inner(inner: RpcClientInner) -> Self {
109 Self(Arc::new(inner))
110 }
111
112 pub const fn inner(&self) -> &Arc<RpcClientInner> {
114 &self.0
115 }
116
117 pub fn into_inner(self) -> Arc<RpcClientInner> {
119 self.0
120 }
121
122 pub fn get_weak(&self) -> WeakClient {
124 Arc::downgrade(&self.0)
125 }
126
127 pub fn get_ref(&self) -> ClientRef<'_> {
129 &self.0
130 }
131
132 pub fn with_poll_interval(self, poll_interval: Duration) -> Self {
137 self.inner().set_poll_interval(poll_interval);
138 self
139 }
140
141 pub fn prepare_static_poller<Params, Resp>(
145 &self,
146 method: impl Into<Cow<'static, str>>,
147 params: Params,
148 ) -> PollerBuilder<Params, Resp>
149 where
150 Params: RpcSend + 'static,
151 Resp: RpcRecv + Clone,
152 {
153 PollerBuilder::new(self.get_weak(), method, params)
154 }
155
156 #[deprecated(since = "0.9.0", note = "`RpcClient` is now always boxed")]
158 #[allow(clippy::missing_const_for_fn)]
159 pub fn boxed(self) -> Self {
160 self
161 }
162
163 #[inline]
165 pub fn new_batch(&self) -> BatchRequest<'_> {
166 BatchRequest::new(&self.0)
167 }
168}
169
170impl Deref for RpcClient {
171 type Target = RpcClientInner;
172
173 #[inline]
174 fn deref(&self) -> &Self::Target {
175 &self.0
176 }
177}
178
179#[derive(Debug)]
192pub struct RpcClientInner {
193 pub(crate) transport: BoxTransport,
195 #[allow(unused)]
202 pub(crate) pubsub: MaybePubsub,
203 pub(crate) is_local: bool,
205 pub(crate) id: AtomicU64,
207 pub(crate) poll_interval: AtomicU64,
209}
210
211impl RpcClientInner {
212 #[inline]
217 pub fn new(t: impl IntoBoxTransport, is_local: bool) -> Self {
218 Self {
219 transport: t.into_box_transport(),
220 pubsub: None,
221 is_local,
222 id: AtomicU64::new(0),
223 poll_interval: if is_local { AtomicU64::new(250) } else { AtomicU64::new(7000) },
224 }
225 }
226
227 pub(crate) fn new_maybe_pubsub(
230 t: impl IntoBoxTransport,
231 is_local: bool,
232 pubsub: MaybePubsub,
233 ) -> Self {
234 Self { pubsub, ..Self::new(t.into_box_transport(), is_local) }
235 }
236
237 #[inline]
239 pub fn with_id(self, id: u64) -> Self {
240 Self { id: AtomicU64::new(id), ..self }
241 }
242
243 pub fn poll_interval(&self) -> Duration {
245 Duration::from_millis(self.poll_interval.load(Ordering::Relaxed))
246 }
247
248 pub fn set_poll_interval(&self, poll_interval: Duration) {
251 self.poll_interval.store(poll_interval.as_millis() as u64, Ordering::Relaxed);
252 }
253
254 #[inline]
256 pub const fn transport(&self) -> &BoxTransport {
257 &self.transport
258 }
259
260 #[inline]
262 pub fn transport_mut(&mut self) -> &mut BoxTransport {
263 &mut self.transport
264 }
265
266 #[inline]
268 pub fn into_transport(self) -> BoxTransport {
269 self.transport
270 }
271
272 #[cfg(feature = "pubsub")]
274 #[inline]
275 #[track_caller]
276 pub fn pubsub_frontend(&self) -> Option<&alloy_pubsub::PubSubFrontend> {
277 if let Some(pubsub) = &self.pubsub {
278 return Some(pubsub);
279 }
280 self.transport.as_any().downcast_ref::<alloy_pubsub::PubSubFrontend>()
281 }
282
283 #[cfg(feature = "pubsub")]
289 #[inline]
290 #[track_caller]
291 pub fn expect_pubsub_frontend(&self) -> &alloy_pubsub::PubSubFrontend {
292 self.pubsub_frontend().expect("called pubsub_frontend on a non-pubsub transport")
293 }
294
295 #[inline]
301 pub fn make_request<Params: RpcSend>(
302 &self,
303 method: impl Into<Cow<'static, str>>,
304 params: Params,
305 ) -> Request<Params> {
306 Request::new(method, self.next_id(), params)
307 }
308
309 #[inline]
316 pub const fn is_local(&self) -> bool {
317 self.is_local
318 }
319
320 #[inline]
322 pub fn set_local(&mut self, is_local: bool) {
323 self.is_local = is_local;
324 }
325
326 #[inline]
328 fn increment_id(&self) -> u64 {
329 self.id.fetch_add(1, Ordering::Relaxed)
330 }
331
332 #[inline]
334 pub fn next_id(&self) -> Id {
335 self.increment_id().into()
336 }
337
338 #[doc(alias = "prepare")]
349 pub fn request<Params: RpcSend, Resp: RpcRecv>(
350 &self,
351 method: impl Into<Cow<'static, str>>,
352 params: Params,
353 ) -> RpcCall<Params, Resp> {
354 let request = self.make_request(method, params);
355 RpcCall::new(request, self.transport.clone())
356 }
357
358 pub fn request_noparams<Resp: RpcRecv>(
362 &self,
363 method: impl Into<Cow<'static, str>>,
364 ) -> RpcCall<NoParams, Resp> {
365 self.request(method, [])
366 }
367
368 #[deprecated(since = "0.9.0", note = "`RpcClientInner` is now always boxed")]
371 #[allow(clippy::missing_const_for_fn)]
372 pub fn boxed(self) -> Self {
373 self
374 }
375}
376
377#[cfg(feature = "pubsub")]
378mod pubsub_impl {
379 use super::*;
380 use alloy_pubsub::{PubSubConnect, RawSubscription, Subscription};
381 use alloy_transport::TransportResult;
382
383 impl RpcClientInner {
384 pub async fn get_raw_subscription(&self, id: alloy_primitives::B256) -> RawSubscription {
390 self.expect_pubsub_frontend().get_subscription(id).await.unwrap()
391 }
392
393 pub async fn get_subscription<T: serde::de::DeserializeOwned>(
399 &self,
400 id: alloy_primitives::B256,
401 ) -> Subscription<T> {
402 Subscription::from(self.get_raw_subscription(id).await)
403 }
404 }
405
406 impl RpcClient {
407 pub async fn connect_pubsub<C: PubSubConnect>(connect: C) -> TransportResult<Self> {
409 ClientBuilder::default().pubsub(connect).await
410 }
411
412 #[track_caller]
423 pub fn channel_size(&self) -> usize {
424 self.expect_pubsub_frontend().channel_size()
425 }
426
427 #[track_caller]
433 pub fn set_channel_size(&self, size: usize) {
434 self.expect_pubsub_frontend().set_channel_size(size)
435 }
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442 use similar_asserts::assert_eq;
443
444 #[test]
445 fn test_client_with_poll_interval() {
446 let poll_interval = Duration::from_millis(5_000);
447 let client = RpcClient::new_http(reqwest::Url::parse("http://localhost").unwrap())
448 .with_poll_interval(poll_interval);
449 assert_eq!(client.poll_interval(), poll_interval);
450 }
451}