alloy_provider/provider/
root.rs1use crate::{
2 blocks::NewBlocks,
3 heart::{Heartbeat, HeartbeatHandle},
4 Identity, ProviderBuilder,
5};
6use alloy_network::{Ethereum, Network};
7use alloy_rpc_client::{BuiltInConnectionString, ClientBuilder, ClientRef, RpcClient, WeakClient};
8use alloy_transport::{TransportConnect, TransportError};
9use std::{
10 fmt,
11 marker::PhantomData,
12 sync::{Arc, OnceLock},
13};
14
15#[cfg(feature = "pubsub")]
16use alloy_pubsub::{PubSubFrontend, Subscription};
17
18pub struct RootProvider<N: Network = Ethereum> {
21 pub(crate) inner: Arc<RootProviderInner<N>>,
23}
24
25impl<N: Network> Clone for RootProvider<N> {
26 fn clone(&self) -> Self {
27 Self { inner: self.inner.clone() }
28 }
29}
30
31impl<N: Network> fmt::Debug for RootProvider<N> {
32 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33 f.debug_struct("RootProvider").field("client", &self.inner.client).finish_non_exhaustive()
34 }
35}
36
37pub fn builder<N: Network>() -> ProviderBuilder<Identity, Identity, N> {
40 ProviderBuilder::default()
41}
42
43impl<N: Network> RootProvider<N> {
44 #[cfg(feature = "reqwest")]
46 pub fn new_http(url: url::Url) -> Self {
47 Self::new(RpcClient::new_http(url))
48 }
49
50 pub fn new(client: RpcClient) -> Self {
52 Self { inner: Arc::new(RootProviderInner::new(client)) }
53 }
54
55 pub async fn connect(s: &str) -> Result<Self, TransportError> {
59 Self::connect_with(s.parse::<BuiltInConnectionString>()?).await
60 }
61
62 pub async fn connect_with<C: TransportConnect>(conn: C) -> Result<Self, TransportError> {
64 ClientBuilder::default().connect_with(conn).await.map(Self::new)
65 }
66}
67
68impl<N: Network> RootProvider<N> {
69 #[cfg(feature = "pubsub")]
71 pub async fn get_subscription<R: alloy_json_rpc::RpcRecv>(
72 &self,
73 id: alloy_primitives::B256,
74 ) -> alloy_transport::TransportResult<Subscription<R>> {
75 self.pubsub_frontend()?.get_subscription(id).await.map(Subscription::from)
76 }
77
78 #[cfg(feature = "pubsub")]
80 pub fn unsubscribe(&self, id: alloy_primitives::B256) -> alloy_transport::TransportResult<()> {
81 self.pubsub_frontend()?.unsubscribe(id)
82 }
83
84 #[cfg(feature = "pubsub")]
85 pub(crate) fn pubsub_frontend(&self) -> alloy_transport::TransportResult<&PubSubFrontend> {
86 self.inner
87 .client_ref()
88 .pubsub_frontend()
89 .ok_or_else(alloy_transport::TransportErrorKind::pubsub_unavailable)
90 }
91
92 #[inline]
93 pub(crate) fn get_heart(&self) -> &HeartbeatHandle {
94 self.inner.heart.get_or_init(|| {
95 let new_blocks = NewBlocks::<N>::new(self.inner.weak_client());
96 let paused = new_blocks.paused.clone();
97 let stream = new_blocks.into_stream();
98 Heartbeat::<N, _>::new(Box::pin(stream), paused).spawn()
99 })
100 }
101}
102
103pub(crate) struct RootProviderInner<N: Network = Ethereum> {
106 client: RpcClient,
107 heart: OnceLock<HeartbeatHandle>,
108 _network: PhantomData<N>,
109}
110
111impl<N: Network> Clone for RootProviderInner<N> {
112 fn clone(&self) -> Self {
113 Self { client: self.client.clone(), heart: self.heart.clone(), _network: PhantomData }
114 }
115}
116
117impl<N: Network> RootProviderInner<N> {
118 pub(crate) fn new(client: RpcClient) -> Self {
119 Self { client, heart: Default::default(), _network: PhantomData }
120 }
121
122 pub(crate) fn weak_client(&self) -> WeakClient {
123 self.client.get_weak()
124 }
125
126 pub(crate) fn client_ref(&self) -> ClientRef<'_> {
127 self.client.get_ref()
128 }
129}