alloy_provider/provider/
subscription.rs

1use alloy_json_rpc::{RpcRecv, RpcSend};
2use alloy_primitives::B256;
3use alloy_pubsub::Subscription;
4use alloy_rpc_client::{RpcCall, WeakClient};
5use alloy_transport::{TransportErrorKind, TransportResult};
6
7/// A general-purpose subscription request builder
8///
9/// This struct allows configuring subscription parameters and channel size
10/// before initiating a request to subscribe to Ethereum events.
11pub struct GetSubscription<P, R>
12where
13    P: RpcSend,
14    R: RpcRecv,
15{
16    client: WeakClient,
17    call: RpcCall<P, B256>,
18    channel_size: Option<usize>,
19    _marker: std::marker::PhantomData<fn() -> R>,
20}
21
22impl<P, R> GetSubscription<P, R>
23where
24    P: RpcSend,
25    R: RpcRecv,
26{
27    /// Creates a new [`GetSubscription`] instance
28    pub fn new(client: WeakClient, call: RpcCall<P, B256>) -> Self {
29        Self { client, call, channel_size: None, _marker: std::marker::PhantomData }
30    }
31
32    /// Set the channel_size for the subscription stream.
33    pub const fn channel_size(mut self, size: usize) -> Self {
34        self.channel_size = Some(size);
35        self
36    }
37}
38
39impl<P, R> core::fmt::Debug for GetSubscription<P, R>
40where
41    P: RpcSend,
42    R: RpcRecv,
43{
44    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
45        f.debug_struct("GetSubscription")
46            .field("channel_size", &self.channel_size)
47            .field("call", &self.call)
48            .finish()
49    }
50}
51
52impl<P, R> std::future::IntoFuture for GetSubscription<P, R>
53where
54    P: RpcSend + 'static,
55    R: RpcRecv,
56{
57    type Output = TransportResult<alloy_pubsub::Subscription<R>>;
58    type IntoFuture = futures_utils_wasm::BoxFuture<'static, Self::Output>;
59
60    fn into_future(self) -> Self::IntoFuture {
61        Box::pin(async move {
62            let client = self
63                .client
64                .upgrade()
65                .ok_or_else(|| TransportErrorKind::custom_str("client dropped"))?;
66            let pubsub = client.pubsub_frontend().ok_or(TransportErrorKind::PubsubUnavailable)?;
67
68            // Set config channel size if any
69            if let Some(size) = self.channel_size {
70                pubsub.set_channel_size(size);
71            }
72
73            let id = self.call.await?;
74
75            pubsub.get_subscription(id).await.map(Subscription::from)
76        })
77    }
78}