ethers_providers/rpc/
pubsub.rs1use crate::{JsonRpcClient, Middleware, Provider};
2use ethers_core::types::U256;
3use futures_util::stream::Stream;
4use pin_project::{pin_project, pinned_drop};
5use serde::de::DeserializeOwned;
6use serde_json::value::RawValue;
7use std::{
8 collections::VecDeque,
9 marker::PhantomData,
10 pin::Pin,
11 task::{Context, Poll},
12};
13use tracing::error;
14
15pub trait PubsubClient: JsonRpcClient {
17 type NotificationStream: futures_core::Stream<Item = Box<RawValue>> + Send + Unpin;
19
20 fn subscribe<T: Into<U256>>(&self, id: T) -> Result<Self::NotificationStream, Self::Error>;
22
23 fn unsubscribe<T: Into<U256>>(&self, id: T) -> Result<(), Self::Error>;
25}
26
27#[must_use = "subscriptions do nothing unless you stream them"]
28#[pin_project(PinnedDrop)]
29pub struct SubscriptionStream<'a, P: PubsubClient, R: DeserializeOwned> {
31 pub id: U256,
33
34 loaded_elements: VecDeque<R>,
35
36 pub(crate) provider: &'a Provider<P>,
37
38 #[pin]
39 rx: P::NotificationStream,
40
41 ret: PhantomData<R>,
42}
43
44impl<'a, P, R> SubscriptionStream<'a, P, R>
45where
46 P: PubsubClient,
47 R: DeserializeOwned,
48{
49 pub fn new(id: U256, provider: &'a Provider<P>) -> Result<Self, P::Error> {
57 let rx = provider.as_ref().subscribe(id)?;
59 Ok(Self { id, provider, rx, ret: PhantomData, loaded_elements: VecDeque::new() })
60 }
61
62 pub async fn unsubscribe(&self) -> Result<bool, crate::ProviderError> {
64 self.provider.unsubscribe(self.id).await
65 }
66
67 pub fn set_loaded_elements(&mut self, loaded_elements: VecDeque<R>) {
75 self.loaded_elements = loaded_elements;
76 }
77}
78
79impl<'a, P, R> Stream for SubscriptionStream<'a, P, R>
83where
84 P: PubsubClient,
85 R: DeserializeOwned,
86{
87 type Item = R;
88
89 fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
90 if !self.loaded_elements.is_empty() {
91 let next_element = self.get_mut().loaded_elements.pop_front();
92 return Poll::Ready(next_element)
93 }
94
95 let mut this = self.project();
96 loop {
97 return match futures_util::ready!(this.rx.as_mut().poll_next(ctx)) {
98 Some(item) => match serde_json::from_str(item.get()) {
99 Ok(res) => Poll::Ready(Some(res)),
100 Err(err) => {
101 error!("failed to deserialize item {:?}", err);
102 continue
103 }
104 },
105 None => Poll::Ready(None),
106 }
107 }
108 }
109}
110
111#[pinned_drop]
112impl<P, R> PinnedDrop for SubscriptionStream<'_, P, R>
113where
114 P: PubsubClient,
115 R: DeserializeOwned,
116{
117 fn drop(self: Pin<&mut Self>) {
118 let _ = (*self.provider).as_ref().unsubscribe(self.id);
122 }
123}