ic_web3/api/
eth_subscribe.rs

1//! `Eth` namespace, subscriptions
2
3use crate::{
4    api::Namespace,
5    error, helpers,
6    types::{BlockHeader, Filter, Log, SyncState, H256},
7    DuplexTransport,
8};
9use futures::{
10    task::{Context, Poll},
11    Stream,
12};
13use pin_project::{pin_project, pinned_drop};
14use std::{marker::PhantomData, pin::Pin};
15
16/// `Eth` namespace, subscriptions
17#[derive(Debug, Clone)]
18pub struct EthSubscribe<T> {
19    transport: T,
20}
21
22impl<T: DuplexTransport> Namespace<T> for EthSubscribe<T> {
23    fn new(transport: T) -> Self
24    where
25        Self: Sized,
26    {
27        EthSubscribe { transport }
28    }
29
30    fn transport(&self) -> &T {
31        &self.transport
32    }
33}
34
35/// ID of subscription returned from `eth_subscribe`
36#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)]
37pub struct SubscriptionId(String);
38
39impl From<String> for SubscriptionId {
40    fn from(s: String) -> Self {
41        SubscriptionId(s)
42    }
43}
44
45/// Stream of notifications from a subscription
46/// Given a type deserializable from rpc::Value and a subscription id, yields items of that type as
47/// notifications are delivered.
48#[pin_project(PinnedDrop)]
49#[derive(Debug)]
50pub struct SubscriptionStream<T: DuplexTransport, I> {
51    transport: T,
52    id: SubscriptionId,
53    #[pin]
54    rx: T::NotificationStream,
55    _marker: PhantomData<I>,
56}
57
58impl<T: DuplexTransport, I> SubscriptionStream<T, I> {
59    fn new(transport: T, id: SubscriptionId) -> error::Result<Self> {
60        let rx = transport.subscribe(id.clone())?;
61        Ok(SubscriptionStream {
62            transport,
63            id,
64            rx,
65            _marker: PhantomData,
66        })
67    }
68
69    /// Return the ID of this subscription
70    pub fn id(&self) -> &SubscriptionId {
71        &self.id
72    }
73
74    /// Unsubscribe from the event represented by this stream
75    pub async fn unsubscribe(self) -> error::Result<bool> {
76        let &SubscriptionId(ref id) = &self.id;
77        let id = helpers::serialize(&id);
78        let response = self.transport.execute("eth_unsubscribe", vec![id]).await?;
79        helpers::decode(response)
80    }
81}
82
83impl<T, I> Stream for SubscriptionStream<T, I>
84where
85    T: DuplexTransport,
86    I: serde::de::DeserializeOwned,
87{
88    type Item = error::Result<I>;
89
90    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
91        let this = self.project();
92        let x = ready!(this.rx.poll_next(ctx));
93        Poll::Ready(x.map(|result| serde_json::from_value(result).map_err(Into::into)))
94    }
95}
96
97#[pinned_drop]
98impl<T, I> PinnedDrop for SubscriptionStream<T, I>
99where
100    T: DuplexTransport,
101{
102    fn drop(self: Pin<&mut Self>) {
103        let _ = self.transport.unsubscribe(self.id().clone());
104    }
105}
106
107impl<T: DuplexTransport> EthSubscribe<T> {
108    /// Create a new heads subscription
109    pub async fn subscribe_new_heads(&self) -> error::Result<SubscriptionStream<T, BlockHeader>> {
110        let subscription = helpers::serialize(&&"newHeads");
111        let response = self.transport.execute("eth_subscribe", vec![subscription]).await?;
112        let id: String = helpers::decode(response)?;
113        SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
114    }
115
116    /// Create a logs subscription
117    pub async fn subscribe_logs(&self, filter: Filter) -> error::Result<SubscriptionStream<T, Log>> {
118        let subscription = helpers::serialize(&&"logs");
119        let filter = helpers::serialize(&filter);
120        let response = self
121            .transport
122            .execute("eth_subscribe", vec![subscription, filter])
123            .await?;
124        let id: String = helpers::decode(response)?;
125        SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
126    }
127
128    /// Create a pending transactions subscription
129    pub async fn subscribe_new_pending_transactions(&self) -> error::Result<SubscriptionStream<T, H256>> {
130        let subscription = helpers::serialize(&&"newPendingTransactions");
131        let response = self.transport.execute("eth_subscribe", vec![subscription]).await?;
132        let id: String = helpers::decode(response)?;
133        SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
134    }
135
136    /// Create a sync status subscription
137    pub async fn subscribe_syncing(&self) -> error::Result<SubscriptionStream<T, SyncState>> {
138        let subscription = helpers::serialize(&&"syncing");
139        let response = self.transport.execute("eth_subscribe", vec![subscription]).await?;
140        let id: String = helpers::decode(response)?;
141        SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
142    }
143}