Skip to main content

hiero_sdk/mirror_query/
subscribe.rs

1// SPDX-License-Identifier: Apache-2.0
2
3use async_stream::stream;
4use backoff::backoff::Backoff;
5use backoff::ExponentialBackoff;
6use futures_core::future::BoxFuture;
7use futures_core::Stream;
8use futures_util::StreamExt;
9use tokio::time::sleep;
10use tonic::transport::Channel;
11use tonic::Status;
12
13use crate::mirror_query::AnyMirrorQueryData;
14use crate::{
15    Client,
16    Error,
17    MirrorQuery,
18};
19
20impl<D> MirrorQuery<D>
21where
22    D: MirrorQueryExecute,
23{
24    /// Execute this query against the provided client of the Hiero network.
25    // todo:
26    #[allow(clippy::missing_errors_doc)]
27    pub async fn execute(&mut self, client: &Client) -> crate::Result<D::Response> {
28        self.execute_with_optional_timeout(client, None).await
29    }
30
31    pub(crate) async fn execute_with_optional_timeout(
32        &self,
33        client: &Client,
34        timeout: Option<std::time::Duration>,
35    ) -> crate::Result<D::Response> {
36        self.data.execute_with_optional_timeout(&self.common, client, timeout).await
37    }
38
39    /// Execute this query against the provided client of the Hiero network.
40    ///
41    /// Note that `timeout` is the connection timeout.
42    // todo:
43    #[allow(clippy::missing_errors_doc)]
44    pub async fn execute_with_timeout(
45        &mut self,
46        client: &Client,
47        timeout: std::time::Duration,
48    ) -> crate::Result<D::Response> {
49        self.execute_with_optional_timeout(client, Some(timeout)).await
50    }
51
52    /// Subscribe to this query with the provided client of the Hiero network.
53    pub fn subscribe<'a>(&self, client: &'a Client) -> D::ItemStream<'a> {
54        self.subscribe_with_optional_timeout(client, None)
55    }
56
57    /// Subscribe to this query with the provided client of the Hiero network.
58    ///
59    /// Note that `timeout` is the connection timeout.
60    pub fn subscribe_with_timeout<'a>(
61        &self,
62        client: &'a Client,
63        timeout: std::time::Duration,
64    ) -> D::ItemStream<'a> {
65        self.subscribe_with_optional_timeout(client, Some(timeout))
66    }
67
68    pub(crate) fn subscribe_with_optional_timeout<'a>(
69        &self,
70        client: &'a Client,
71        timeout: Option<std::time::Duration>,
72    ) -> D::ItemStream<'a> {
73        self.data.subscribe_with_optional_timeout(&self.common, client, timeout)
74    }
75}
76
77pub trait MirrorQueryExecute: Sized + Into<AnyMirrorQueryData> + Send + Sync {
78    type Item;
79    type Response;
80    type ItemStream<'a>: Stream<Item = crate::Result<Self::Item>> + 'a
81    where
82        Self: 'a;
83
84    fn subscribe_with_optional_timeout<'a>(
85        &self,
86        params: &crate::mirror_query::MirrorQueryCommon,
87        client: &'a crate::Client,
88        timeout: Option<std::time::Duration>,
89    ) -> Self::ItemStream<'a>
90    where
91        Self: 'a;
92
93    fn execute_with_optional_timeout<'a>(
94        &'a self,
95        params: &'a super::MirrorQueryCommon,
96        client: &'a Client,
97        timeout: Option<std::time::Duration>,
98    ) -> BoxFuture<'a, crate::Result<Self::Response>>;
99}
100
101impl<T> MirrorQueryExecute for T
102where
103    T: MirrorRequest + Sync + Clone + Into<AnyMirrorQueryData>,
104{
105    type Item = <Self as MirrorRequest>::Item;
106
107    type Response = <Self as MirrorRequest>::Response;
108
109    type ItemStream<'a>
110        = <Self as MirrorRequest>::ItemStream<'a>
111    where
112        Self: 'a;
113
114    fn subscribe_with_optional_timeout<'a>(
115        &self,
116        _params: &crate::mirror_query::MirrorQueryCommon,
117        client: &'a crate::Client,
118        timeout: Option<std::time::Duration>,
119    ) -> Self::ItemStream<'a>
120    where
121        Self: 'a,
122    {
123        let timeout = timeout.or_else(|| client.request_timeout()).unwrap_or_else(|| {
124            std::time::Duration::from_millis(backoff::default::MAX_ELAPSED_TIME_MILLIS)
125        });
126
127        // note: we don't care about keeping the mirrornet around, so, we just take the channel (which is arc-like)
128        let channel = client.mirrornet().load().channel(client.grpc_deadline());
129
130        Self::make_item_stream(crate::mirror_query::subscribe(channel, timeout, self.clone()))
131    }
132
133    fn execute_with_optional_timeout<'a>(
134        &'a self,
135        _params: &'a crate::mirror_query::MirrorQueryCommon,
136        client: &crate::Client,
137        timeout: Option<std::time::Duration>,
138    ) -> BoxFuture<'a, crate::Result<Self::Response>> {
139        let timeout = timeout.or_else(|| client.request_timeout()).unwrap_or_else(|| {
140            std::time::Duration::from_millis(backoff::default::MAX_ELAPSED_TIME_MILLIS)
141        });
142
143        // note: we don't care about keeping the mirrornet around, so, we just take the channel (which is arc-like)
144        let channel = client.mirrornet().load().channel(client.grpc_deadline());
145
146        Self::try_collect(crate::mirror_query::subscribe(channel, timeout, self.clone()))
147    }
148}
149
150pub trait MirrorRequest: Send {
151    type GrpcItem: Send;
152    type ConnectStream: Stream<Item = tonic::Result<Self::GrpcItem>> + Send;
153
154    type Item;
155    type Response;
156    type Context: Default + Send + Sync;
157
158    type ItemStream<'a>: Stream<Item = crate::Result<Self::Item>> + 'a;
159
160    fn connect(
161        &self,
162        context: &Self::Context,
163        channel: Channel,
164    ) -> BoxFuture<'_, tonic::Result<Self::ConnectStream>>;
165
166    /// Return `true` to retry establishing the stream, up to a configurable maximum timeout.
167    #[allow(unused_variables)]
168    fn should_retry(&self, status_code: tonic::Code) -> bool {
169        false
170    }
171
172    fn make_item_stream<'a, S>(stream: S) -> Self::ItemStream<'a>
173    where
174        S: Stream<Item = crate::Result<Self::GrpcItem>> + Send + 'a;
175
176    fn update_context(context: &mut Self::Context, item: &Self::GrpcItem);
177
178    fn try_collect<'a, S>(stream: S) -> BoxFuture<'a, crate::Result<Self::Response>>
179    where
180        S: Stream<Item = crate::Result<Self::GrpcItem>> + Send + 'a;
181}
182
183pub(crate) fn subscribe<I: Send, R: MirrorRequest<GrpcItem = I> + Send + Sync>(
184    channel: Channel,
185    timeout: std::time::Duration,
186    request: R,
187) -> impl Stream<Item = crate::Result<I>> + Send {
188    stream! {
189        let request = request;
190
191        let mut backoff = ExponentialBackoff {
192            max_elapsed_time: Some(timeout),
193            ..ExponentialBackoff::default()
194        };
195
196        let mut backoff_inf = ExponentialBackoff {
197            max_elapsed_time: None,
198            // remove maximum elapsed time for # of back-offs on inf.
199            .. ExponentialBackoff::default()
200        };
201
202        let mut context = R::Context::default();
203
204        loop {
205            let status: Status = 'request: loop {
206                // attempt to establish the stream
207                let response = request.connect(&context, channel.clone()).await;
208
209                let stream = match response {
210                    // success, we now have a stream and may begin waiting for messages
211                    Ok(stream) => stream,
212
213                    Err(status) => {
214                        break 'request status;
215                    }
216                };
217
218                let mut stream = std::pin::pin!(stream);
219
220                backoff.reset();
221                backoff_inf.reset();
222
223                #[allow(unused_labels)]
224                'message: loop {
225                    let message = stream.next().await.transpose();
226
227                    let message = match message {
228                        Ok(Some(message)) => message,
229                        Ok(None) => {
230                            // end of stream
231                            // hopefully due to configured limits or expected conditions
232                            return;
233                        }
234
235                        Err(status) => {
236                            break 'request status;
237                        }
238                    };
239
240                    R::update_context(&mut context, &message);
241
242                    yield Ok(message);
243                }
244            };
245
246            match status.code() {
247                tonic::Code::Unavailable | tonic::Code::ResourceExhausted => {
248                    // encountered a temporarily down or overloaded service
249                    sleep(backoff_inf.next_backoff().unwrap()).await;
250                }
251
252                tonic::Code::Unknown if status.message() == "error reading a body from connection: connection reset" => {
253                    // connection was aborted by the server
254                    sleep(backoff_inf.next_backoff().unwrap()).await;
255                }
256
257                code if request.should_retry(code) => {
258                    if let Some(duration) = backoff.next_backoff() {
259                        sleep(duration).await;
260                    } else {
261                        // maximum time allowed has elapsed
262                        // NOTE: it should be impossible to reach here without capturing at least one error
263                        yield Err(Error::TimedOut(Error::from(status).into()));
264                        return;
265                    }
266                }
267                _ => {
268                    // encountered an un-recoverable failure when attempting
269                    // to establish the stream
270                    yield Err(Error::from(status));
271                    return;
272                }
273            }
274        }
275    }
276}