ic_web3/api/
eth_subscribe.rs1use crate::{
4 api::Namespace,
5 error, helpers,
6 types::{BlockHeader, Filter, Log, SyncState, H256},
7 DuplexTransport,
8};
9use futures::{
10 task::{Context, Poll},
11 Stream,
12};
13use pin_project::{pin_project, pinned_drop};
14use std::{marker::PhantomData, pin::Pin};
15
16#[derive(Debug, Clone)]
18pub struct EthSubscribe<T> {
19 transport: T,
20}
21
22impl<T: DuplexTransport> Namespace<T> for EthSubscribe<T> {
23 fn new(transport: T) -> Self
24 where
25 Self: Sized,
26 {
27 EthSubscribe { transport }
28 }
29
30 fn transport(&self) -> &T {
31 &self.transport
32 }
33}
34
35#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)]
37pub struct SubscriptionId(String);
38
39impl From<String> for SubscriptionId {
40 fn from(s: String) -> Self {
41 SubscriptionId(s)
42 }
43}
44
45#[pin_project(PinnedDrop)]
49#[derive(Debug)]
50pub struct SubscriptionStream<T: DuplexTransport, I> {
51 transport: T,
52 id: SubscriptionId,
53 #[pin]
54 rx: T::NotificationStream,
55 _marker: PhantomData<I>,
56}
57
58impl<T: DuplexTransport, I> SubscriptionStream<T, I> {
59 fn new(transport: T, id: SubscriptionId) -> error::Result<Self> {
60 let rx = transport.subscribe(id.clone())?;
61 Ok(SubscriptionStream {
62 transport,
63 id,
64 rx,
65 _marker: PhantomData,
66 })
67 }
68
69 pub fn id(&self) -> &SubscriptionId {
71 &self.id
72 }
73
74 pub async fn unsubscribe(self) -> error::Result<bool> {
76 let &SubscriptionId(ref id) = &self.id;
77 let id = helpers::serialize(&id);
78 let response = self.transport.execute("eth_unsubscribe", vec![id]).await?;
79 helpers::decode(response)
80 }
81}
82
83impl<T, I> Stream for SubscriptionStream<T, I>
84where
85 T: DuplexTransport,
86 I: serde::de::DeserializeOwned,
87{
88 type Item = error::Result<I>;
89
90 fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
91 let this = self.project();
92 let x = ready!(this.rx.poll_next(ctx));
93 Poll::Ready(x.map(|result| serde_json::from_value(result).map_err(Into::into)))
94 }
95}
96
97#[pinned_drop]
98impl<T, I> PinnedDrop for SubscriptionStream<T, I>
99where
100 T: DuplexTransport,
101{
102 fn drop(self: Pin<&mut Self>) {
103 let _ = self.transport.unsubscribe(self.id().clone());
104 }
105}
106
107impl<T: DuplexTransport> EthSubscribe<T> {
108 pub async fn subscribe_new_heads(&self) -> error::Result<SubscriptionStream<T, BlockHeader>> {
110 let subscription = helpers::serialize(&&"newHeads");
111 let response = self.transport.execute("eth_subscribe", vec![subscription]).await?;
112 let id: String = helpers::decode(response)?;
113 SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
114 }
115
116 pub async fn subscribe_logs(&self, filter: Filter) -> error::Result<SubscriptionStream<T, Log>> {
118 let subscription = helpers::serialize(&&"logs");
119 let filter = helpers::serialize(&filter);
120 let response = self
121 .transport
122 .execute("eth_subscribe", vec![subscription, filter])
123 .await?;
124 let id: String = helpers::decode(response)?;
125 SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
126 }
127
128 pub async fn subscribe_new_pending_transactions(&self) -> error::Result<SubscriptionStream<T, H256>> {
130 let subscription = helpers::serialize(&&"newPendingTransactions");
131 let response = self.transport.execute("eth_subscribe", vec![subscription]).await?;
132 let id: String = helpers::decode(response)?;
133 SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
134 }
135
136 pub async fn subscribe_syncing(&self) -> error::Result<SubscriptionStream<T, SyncState>> {
138 let subscription = helpers::serialize(&&"syncing");
139 let response = self.transport.execute("eth_subscribe", vec![subscription]).await?;
140 let id: String = helpers::decode(response)?;
141 SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
142 }
143}