alloy_provider/provider/
subscription.rs1use alloy_json_rpc::{RpcRecv, RpcSend};
2use alloy_primitives::B256;
3use alloy_pubsub::Subscription;
4use alloy_rpc_client::{RpcCall, WeakClient};
5use alloy_transport::{TransportErrorKind, TransportResult};
6
7pub struct GetSubscription<P, R>
12where
13 P: RpcSend,
14 R: RpcRecv,
15{
16 client: WeakClient,
17 call: RpcCall<P, B256>,
18 channel_size: Option<usize>,
19 _marker: std::marker::PhantomData<fn() -> R>,
20}
21
22impl<P, R> GetSubscription<P, R>
23where
24 P: RpcSend,
25 R: RpcRecv,
26{
27 pub fn new(client: WeakClient, call: RpcCall<P, B256>) -> Self {
29 Self { client, call, channel_size: None, _marker: std::marker::PhantomData }
30 }
31
32 pub const fn channel_size(mut self, size: usize) -> Self {
34 self.channel_size = Some(size);
35 self
36 }
37}
38
39impl<P, R> core::fmt::Debug for GetSubscription<P, R>
40where
41 P: RpcSend,
42 R: RpcRecv,
43{
44 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
45 f.debug_struct("GetSubscription")
46 .field("channel_size", &self.channel_size)
47 .field("call", &self.call)
48 .finish()
49 }
50}
51
52impl<P, R> std::future::IntoFuture for GetSubscription<P, R>
53where
54 P: RpcSend + 'static,
55 R: RpcRecv,
56{
57 type Output = TransportResult<alloy_pubsub::Subscription<R>>;
58 type IntoFuture = futures_utils_wasm::BoxFuture<'static, Self::Output>;
59
60 fn into_future(self) -> Self::IntoFuture {
61 Box::pin(async move {
62 let client = self
63 .client
64 .upgrade()
65 .ok_or_else(|| TransportErrorKind::custom_str("client dropped"))?;
66 let pubsub = client.pubsub_frontend().ok_or(TransportErrorKind::PubsubUnavailable)?;
67
68 if let Some(size) = self.channel_size {
70 pubsub.set_channel_size(size);
71 }
72
73 let id = self.call.await?;
74
75 pubsub.get_subscription(id).await.map(Subscription::from)
76 })
77 }
78}