ethers_providers/rpc/
pubsub.rs

1use crate::{JsonRpcClient, Middleware, Provider};
2use ethers_core::types::U256;
3use futures_util::stream::Stream;
4use pin_project::{pin_project, pinned_drop};
5use serde::de::DeserializeOwned;
6use serde_json::value::RawValue;
7use std::{
8    collections::VecDeque,
9    marker::PhantomData,
10    pin::Pin,
11    task::{Context, Poll},
12};
13use tracing::error;
14
15/// A transport implementation supporting pub sub subscriptions.
16pub trait PubsubClient: JsonRpcClient {
17    /// The type of stream this transport returns
18    type NotificationStream: futures_core::Stream<Item = Box<RawValue>> + Send + Unpin;
19
20    /// Add a subscription to this transport
21    fn subscribe<T: Into<U256>>(&self, id: T) -> Result<Self::NotificationStream, Self::Error>;
22
23    /// Remove a subscription from this transport
24    fn unsubscribe<T: Into<U256>>(&self, id: T) -> Result<(), Self::Error>;
25}
26
27#[must_use = "subscriptions do nothing unless you stream them"]
28#[pin_project(PinnedDrop)]
29/// Streams data from an installed filter via `eth_subscribe`
30pub struct SubscriptionStream<'a, P: PubsubClient, R: DeserializeOwned> {
31    /// The subscription's installed id on the ethereum node
32    pub id: U256,
33
34    loaded_elements: VecDeque<R>,
35
36    pub(crate) provider: &'a Provider<P>,
37
38    #[pin]
39    rx: P::NotificationStream,
40
41    ret: PhantomData<R>,
42}
43
44impl<'a, P, R> SubscriptionStream<'a, P, R>
45where
46    P: PubsubClient,
47    R: DeserializeOwned,
48{
49    /// Creates a new subscription stream for the provided subscription id.
50    ///
51    /// ### Note
52    /// Most providers treat `SubscriptionStream` IDs as global singletons.
53    /// Instantiating this directly with a known ID will likely cause any
54    /// existing streams with that ID to end. To avoid this, start a new stream
55    /// using [`Provider::subscribe`] instead of `SubscriptionStream::new`.
56    pub fn new(id: U256, provider: &'a Provider<P>) -> Result<Self, P::Error> {
57        // Call the underlying PubsubClient's subscribe
58        let rx = provider.as_ref().subscribe(id)?;
59        Ok(Self { id, provider, rx, ret: PhantomData, loaded_elements: VecDeque::new() })
60    }
61
62    /// Unsubscribes from the subscription.
63    pub async fn unsubscribe(&self) -> Result<bool, crate::ProviderError> {
64        self.provider.unsubscribe(self.id).await
65    }
66
67    /// Set the loaded elements buffer. This buffer contains logs waiting for
68    /// the consumer to read. Setting the buffer can be used to add logs
69    /// without receiving them from the RPC node
70    ///
71    /// ### Warning
72    ///
73    /// Setting the buffer will drop any logs in the current buffer.
74    pub fn set_loaded_elements(&mut self, loaded_elements: VecDeque<R>) {
75        self.loaded_elements = loaded_elements;
76    }
77}
78
79// Each subscription item is a serde_json::Value which must be decoded to the
80// subscription's return type.
81// TODO: Can this be replaced with an `rx.map` in the constructor?
82impl<'a, P, R> Stream for SubscriptionStream<'a, P, R>
83where
84    P: PubsubClient,
85    R: DeserializeOwned,
86{
87    type Item = R;
88
89    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
90        if !self.loaded_elements.is_empty() {
91            let next_element = self.get_mut().loaded_elements.pop_front();
92            return Poll::Ready(next_element)
93        }
94
95        let mut this = self.project();
96        loop {
97            return match futures_util::ready!(this.rx.as_mut().poll_next(ctx)) {
98                Some(item) => match serde_json::from_str(item.get()) {
99                    Ok(res) => Poll::Ready(Some(res)),
100                    Err(err) => {
101                        error!("failed to deserialize item {:?}", err);
102                        continue
103                    }
104                },
105                None => Poll::Ready(None),
106            }
107        }
108    }
109}
110
111#[pinned_drop]
112impl<P, R> PinnedDrop for SubscriptionStream<'_, P, R>
113where
114    P: PubsubClient,
115    R: DeserializeOwned,
116{
117    fn drop(self: Pin<&mut Self>) {
118        // on drop it removes the handler from the websocket so that it stops
119        // getting populated. We need to call `unsubscribe` explicitly to cancel
120        // the subscription
121        let _ = (*self.provider).as_ref().unsubscribe(self.id);
122    }
123}