hiero_sdk/mirror_query/
subscribe.rs1use async_stream::stream;
4use backoff::backoff::Backoff;
5use backoff::ExponentialBackoff;
6use futures_core::future::BoxFuture;
7use futures_core::Stream;
8use futures_util::StreamExt;
9use tokio::time::sleep;
10use tonic::transport::Channel;
11use tonic::Status;
12
13use crate::mirror_query::AnyMirrorQueryData;
14use crate::{
15 Client,
16 Error,
17 MirrorQuery,
18};
19
20impl<D> MirrorQuery<D>
21where
22 D: MirrorQueryExecute,
23{
24 #[allow(clippy::missing_errors_doc)]
27 pub async fn execute(&mut self, client: &Client) -> crate::Result<D::Response> {
28 self.execute_with_optional_timeout(client, None).await
29 }
30
31 pub(crate) async fn execute_with_optional_timeout(
32 &self,
33 client: &Client,
34 timeout: Option<std::time::Duration>,
35 ) -> crate::Result<D::Response> {
36 self.data.execute_with_optional_timeout(&self.common, client, timeout).await
37 }
38
39 #[allow(clippy::missing_errors_doc)]
44 pub async fn execute_with_timeout(
45 &mut self,
46 client: &Client,
47 timeout: std::time::Duration,
48 ) -> crate::Result<D::Response> {
49 self.execute_with_optional_timeout(client, Some(timeout)).await
50 }
51
52 pub fn subscribe<'a>(&self, client: &'a Client) -> D::ItemStream<'a> {
54 self.subscribe_with_optional_timeout(client, None)
55 }
56
57 pub fn subscribe_with_timeout<'a>(
61 &self,
62 client: &'a Client,
63 timeout: std::time::Duration,
64 ) -> D::ItemStream<'a> {
65 self.subscribe_with_optional_timeout(client, Some(timeout))
66 }
67
68 pub(crate) fn subscribe_with_optional_timeout<'a>(
69 &self,
70 client: &'a Client,
71 timeout: Option<std::time::Duration>,
72 ) -> D::ItemStream<'a> {
73 self.data.subscribe_with_optional_timeout(&self.common, client, timeout)
74 }
75}
76
77pub trait MirrorQueryExecute: Sized + Into<AnyMirrorQueryData> + Send + Sync {
78 type Item;
79 type Response;
80 type ItemStream<'a>: Stream<Item = crate::Result<Self::Item>> + 'a
81 where
82 Self: 'a;
83
84 fn subscribe_with_optional_timeout<'a>(
85 &self,
86 params: &crate::mirror_query::MirrorQueryCommon,
87 client: &'a crate::Client,
88 timeout: Option<std::time::Duration>,
89 ) -> Self::ItemStream<'a>
90 where
91 Self: 'a;
92
93 fn execute_with_optional_timeout<'a>(
94 &'a self,
95 params: &'a super::MirrorQueryCommon,
96 client: &'a Client,
97 timeout: Option<std::time::Duration>,
98 ) -> BoxFuture<'a, crate::Result<Self::Response>>;
99}
100
101impl<T> MirrorQueryExecute for T
102where
103 T: MirrorRequest + Sync + Clone + Into<AnyMirrorQueryData>,
104{
105 type Item = <Self as MirrorRequest>::Item;
106
107 type Response = <Self as MirrorRequest>::Response;
108
109 type ItemStream<'a>
110 = <Self as MirrorRequest>::ItemStream<'a>
111 where
112 Self: 'a;
113
114 fn subscribe_with_optional_timeout<'a>(
115 &self,
116 _params: &crate::mirror_query::MirrorQueryCommon,
117 client: &'a crate::Client,
118 timeout: Option<std::time::Duration>,
119 ) -> Self::ItemStream<'a>
120 where
121 Self: 'a,
122 {
123 let timeout = timeout.or_else(|| client.request_timeout()).unwrap_or_else(|| {
124 std::time::Duration::from_millis(backoff::default::MAX_ELAPSED_TIME_MILLIS)
125 });
126
127 let channel = client.mirrornet().load().channel(client.grpc_deadline());
129
130 Self::make_item_stream(crate::mirror_query::subscribe(channel, timeout, self.clone()))
131 }
132
133 fn execute_with_optional_timeout<'a>(
134 &'a self,
135 _params: &'a crate::mirror_query::MirrorQueryCommon,
136 client: &crate::Client,
137 timeout: Option<std::time::Duration>,
138 ) -> BoxFuture<'a, crate::Result<Self::Response>> {
139 let timeout = timeout.or_else(|| client.request_timeout()).unwrap_or_else(|| {
140 std::time::Duration::from_millis(backoff::default::MAX_ELAPSED_TIME_MILLIS)
141 });
142
143 let channel = client.mirrornet().load().channel(client.grpc_deadline());
145
146 Self::try_collect(crate::mirror_query::subscribe(channel, timeout, self.clone()))
147 }
148}
149
150pub trait MirrorRequest: Send {
151 type GrpcItem: Send;
152 type ConnectStream: Stream<Item = tonic::Result<Self::GrpcItem>> + Send;
153
154 type Item;
155 type Response;
156 type Context: Default + Send + Sync;
157
158 type ItemStream<'a>: Stream<Item = crate::Result<Self::Item>> + 'a;
159
160 fn connect(
161 &self,
162 context: &Self::Context,
163 channel: Channel,
164 ) -> BoxFuture<'_, tonic::Result<Self::ConnectStream>>;
165
166 #[allow(unused_variables)]
168 fn should_retry(&self, status_code: tonic::Code) -> bool {
169 false
170 }
171
172 fn make_item_stream<'a, S>(stream: S) -> Self::ItemStream<'a>
173 where
174 S: Stream<Item = crate::Result<Self::GrpcItem>> + Send + 'a;
175
176 fn update_context(context: &mut Self::Context, item: &Self::GrpcItem);
177
178 fn try_collect<'a, S>(stream: S) -> BoxFuture<'a, crate::Result<Self::Response>>
179 where
180 S: Stream<Item = crate::Result<Self::GrpcItem>> + Send + 'a;
181}
182
183pub(crate) fn subscribe<I: Send, R: MirrorRequest<GrpcItem = I> + Send + Sync>(
184 channel: Channel,
185 timeout: std::time::Duration,
186 request: R,
187) -> impl Stream<Item = crate::Result<I>> + Send {
188 stream! {
189 let request = request;
190
191 let mut backoff = ExponentialBackoff {
192 max_elapsed_time: Some(timeout),
193 ..ExponentialBackoff::default()
194 };
195
196 let mut backoff_inf = ExponentialBackoff {
197 max_elapsed_time: None,
198 .. ExponentialBackoff::default()
200 };
201
202 let mut context = R::Context::default();
203
204 loop {
205 let status: Status = 'request: loop {
206 let response = request.connect(&context, channel.clone()).await;
208
209 let stream = match response {
210 Ok(stream) => stream,
212
213 Err(status) => {
214 break 'request status;
215 }
216 };
217
218 let mut stream = std::pin::pin!(stream);
219
220 backoff.reset();
221 backoff_inf.reset();
222
223 #[allow(unused_labels)]
224 'message: loop {
225 let message = stream.next().await.transpose();
226
227 let message = match message {
228 Ok(Some(message)) => message,
229 Ok(None) => {
230 return;
233 }
234
235 Err(status) => {
236 break 'request status;
237 }
238 };
239
240 R::update_context(&mut context, &message);
241
242 yield Ok(message);
243 }
244 };
245
246 match status.code() {
247 tonic::Code::Unavailable | tonic::Code::ResourceExhausted => {
248 sleep(backoff_inf.next_backoff().unwrap()).await;
250 }
251
252 tonic::Code::Unknown if status.message() == "error reading a body from connection: connection reset" => {
253 sleep(backoff_inf.next_backoff().unwrap()).await;
255 }
256
257 code if request.should_retry(code) => {
258 if let Some(duration) = backoff.next_backoff() {
259 sleep(duration).await;
260 } else {
261 yield Err(Error::TimedOut(Error::from(status).into()));
264 return;
265 }
266 }
267 _ => {
268 yield Err(Error::from(status));
271 return;
272 }
273 }
274 }
275 }
276}