1use std::collections::HashMap;
4use std::{
5 mem,
6 task,
7};
8
9use futures_core::future::BoxFuture;
10use futures_core::stream::BoxStream;
11use futures_core::Stream;
12use futures_util::TryStreamExt;
13use hiero_sdk_proto::mirror;
14use hiero_sdk_proto::mirror::consensus_service_client::ConsensusServiceClient;
15use hiero_sdk_proto::mirror::ConsensusTopicQuery;
16use time::{
17 Duration,
18 OffsetDateTime,
19};
20use tonic::transport::Channel;
21use tonic::Response;
22
23use super::topic_message::{
24 PbTopicMessageChunk,
25 PbTopicMessageHeader,
26};
27use crate::mirror_query::{
28 AnyMirrorQueryData,
29 AnyMirrorQueryMessage,
30 MirrorRequest,
31};
32use crate::protobuf::FromProtobuf;
33use crate::{
34 AnyMirrorQueryResponse,
35 MirrorQuery,
36 ToProtobuf,
37 TopicId,
38 TopicMessage,
39 TransactionId,
40};
41
42#[derive(Default)]
48pub struct TopicMessageQueryContext {
49 start_time: Option<OffsetDateTime>,
50}
51
52pub type TopicMessageQuery = MirrorQuery<TopicMessageQueryData>;
55
56#[derive(Debug, Default, Clone)]
57pub struct TopicMessageQueryData {
58 topic_id: Option<TopicId>,
60
61 start_time: Option<OffsetDateTime>,
64
65 end_time: Option<OffsetDateTime>,
67
68 limit: u64,
70}
71
72impl TopicMessageQueryData {
73 fn map_stream<'a, S>(stream: S) -> impl Stream<Item = crate::Result<TopicMessage>>
74 where
75 S: Stream<Item = crate::Result<mirror::ConsensusTopicResponse>> + Send + 'a,
76 {
77 MessagesMapStream { inner: stream, incomplete_messages: HashMap::new() }
78 }
79}
80
81impl TopicMessageQuery {
82 #[must_use]
84 pub fn get_topic_id(&self) -> Option<TopicId> {
85 self.data.topic_id
86 }
87
88 pub fn topic_id(&mut self, id: impl Into<TopicId>) -> &mut Self {
90 self.data.topic_id = Some(id.into());
91 self
92 }
93
94 #[must_use]
96 pub fn get_start_time(&self) -> Option<OffsetDateTime> {
97 self.data.start_time
98 }
99
100 pub fn start_time(&mut self, time: OffsetDateTime) -> &mut Self {
103 self.data.start_time = Some(time);
104 self
105 }
106
107 #[must_use]
109 pub fn get_end_time(&self) -> Option<OffsetDateTime> {
110 self.data.end_time
111 }
112
113 pub fn end_time(&mut self, time: OffsetDateTime) -> &mut Self {
115 self.data.end_time = Some(time);
116 self
117 }
118
119 #[must_use]
121 pub fn get_limit(&self) -> u64 {
122 self.data.limit
123 }
124
125 pub fn limit(&mut self, limit: u64) -> &mut Self {
128 self.data.limit = limit;
129 self
130 }
131}
132
133impl From<TopicMessageQueryData> for AnyMirrorQueryData {
134 fn from(data: TopicMessageQueryData) -> Self {
135 Self::TopicMessage(data)
136 }
137}
138
139impl MirrorRequest for TopicMessageQueryData {
140 type GrpcItem = mirror::ConsensusTopicResponse;
141
142 type ConnectStream = tonic::Streaming<Self::GrpcItem>;
143
144 type Context = TopicMessageQueryContext;
145
146 type Item = TopicMessage;
147
148 type Response = Vec<TopicMessage>;
149
150 type ItemStream<'a> = BoxStream<'a, crate::Result<TopicMessage>>;
151
152 fn connect(
153 &self,
154 context: &Self::Context,
155 channel: Channel,
156 ) -> BoxFuture<'_, tonic::Result<Self::ConnectStream>> {
157 let topic_id = self.topic_id.to_protobuf();
158
159 let consensus_end_time = self.end_time.map(Into::into);
160
161 let consensus_start_time = context
164 .start_time
165 .map(|it| it.checked_add(Duration::nanoseconds(1)).unwrap())
166 .or(self.start_time)
167 .map(Into::into);
168
169 let request = ConsensusTopicQuery {
170 consensus_end_time,
171 consensus_start_time,
172 topic_id,
173 limit: self.limit,
174 };
175
176 Box::pin(async move {
177 ConsensusServiceClient::new(channel)
178 .subscribe_topic(request)
179 .await
180 .map(Response::into_inner)
181 })
182 }
183
184 fn make_item_stream<'a, S>(stream: S) -> Self::ItemStream<'a>
185 where
186 S: Stream<Item = crate::Result<Self::GrpcItem>> + Send + 'a,
187 {
188 Box::pin(Self::map_stream(stream))
189 }
190
191 fn try_collect<'a, S>(stream: S) -> BoxFuture<'a, crate::Result<Self::Response>>
192 where
193 S: Stream<Item = crate::Result<Self::GrpcItem>> + Send + 'a,
194 {
195 Box::pin(Self::map_stream(stream).try_collect())
197 }
198
199 fn update_context(context: &mut Self::Context, item: &Self::GrpcItem) {
200 context.start_time =
201 item.consensus_timestamp.map(OffsetDateTime::from).or(context.start_time);
202 }
203}
204
205impl From<TopicMessage> for AnyMirrorQueryMessage {
206 fn from(value: TopicMessage) -> Self {
207 Self::TopicMessage(value)
208 }
209}
210
211impl From<Vec<TopicMessage>> for AnyMirrorQueryResponse {
212 fn from(value: Vec<TopicMessage>) -> Self {
213 Self::TopicMessage(value)
214 }
215}
216
217enum IncompleteMessage {
218 Partial(OffsetDateTime, Vec<PbTopicMessageChunk>),
219 Expired,
220 Complete,
221}
222
223impl IncompleteMessage {
224 fn handle_expiry(&mut self) -> &mut Self {
225 match self {
226 IncompleteMessage::Partial(expiry, _) if *expiry < OffsetDateTime::now_utc() => {
227 *self = Self::Expired;
228 }
229 _ => {}
230 }
231
232 self
233 }
234}
235
236pin_project_lite::pin_project! {
237 struct MessagesMapStream<S> {
238 #[pin]
239 inner: S,
240 incomplete_messages: HashMap<TransactionId, IncompleteMessage>,
241 }
242}
243
244impl<S> Stream for MessagesMapStream<S>
245where
246 S: Stream<Item = crate::Result<mirror::ConsensusTopicResponse>> + Send,
247{
248 type Item = crate::Result<TopicMessage>;
249
250 fn poll_next(
251 self: std::pin::Pin<&mut Self>,
252 cx: &mut task::Context<'_>,
253 ) -> task::Poll<Option<Self::Item>> {
254 use task::Poll;
255
256 let mut this = self.project();
257
258 loop {
259 let item = match task::ready!(this.inner.as_mut().poll_next(cx)) {
260 Some(Ok(item)) => item,
261 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
262 None => return Poll::Ready(None),
263 };
264
265 match filter_map(item, this.incomplete_messages) {
266 Ok(Some(item)) => return Poll::Ready(Some(Ok(item))),
267 Ok(None) => {}
268 Err(e) => return Poll::Ready(Some(Err(e))),
269 }
270 }
271 }
272}
273
274fn filter_map(
275 mut item: mirror::ConsensusTopicResponse,
276 incomplete_messages: &mut HashMap<TransactionId, IncompleteMessage>,
277) -> crate::Result<Option<TopicMessage>> {
278 let header = PbTopicMessageHeader {
279 consensus_timestamp: pb_getf!(item, consensus_timestamp)?.into(),
280 sequence_number: item.sequence_number,
281 running_hash: item.running_hash,
282 running_hash_version: item.running_hash_version,
283 message: item.message,
284 };
285
286 let item = match item.chunk_info.take() {
287 Some(chunk_info) if chunk_info.total > 1 => PbTopicMessageChunk {
288 header,
289 initial_transaction_id: TransactionId::from_protobuf(pb_getf!(
290 chunk_info,
291 initial_transaction_id
292 )?)?,
293 number: chunk_info.number,
294 total: chunk_info.total,
295 },
296 _ => return Ok(Some(TopicMessage::from_single(header))),
297 };
298
299 let tx_id = item.initial_transaction_id;
300
301 let entry = incomplete_messages.entry(tx_id).or_insert_with(|| {
302 IncompleteMessage::Partial(
303 OffsetDateTime::now_utc() + time::Duration::minutes(15),
305 Vec::new(),
306 )
307 });
308
309 let IncompleteMessage::Partial(_, messages) = entry.handle_expiry() else { return Ok(None) };
310
311 match messages.binary_search_by_key(&item.number, |it| it.number) {
312 Ok(_) => {}
314 Err(index) => messages.insert(index, item),
315 };
316
317 let total = messages.iter().map(|it| it.total).min().unwrap();
320
321 match messages.len() >= total as usize {
324 true => {
325 let messages = mem::take(messages);
326 *entry = IncompleteMessage::Complete;
327 Ok(Some(TopicMessage::from_chunks(messages)))
328 }
329
330 false => Ok(None),
331 }
332}
333
334#[cfg(test)]
335mod tests {
336 use time::OffsetDateTime;
337
338 use crate::{
339 TopicId,
340 TopicMessageQuery,
341 };
342
343 #[test]
344 fn get_set_topic_id() {
345 let mut query = TopicMessageQuery::new();
346 query.topic_id(TopicId::new(31, 41, 59));
347
348 assert_eq!(query.get_topic_id(), Some(TopicId::new(31, 41, 59)));
349 }
350 #[test]
351 fn get_set_start_time() {
352 let start_time = OffsetDateTime::now_utc();
353
354 let mut query = TopicMessageQuery::new();
355 query.start_time(start_time);
356
357 assert_eq!(query.get_start_time(), Some(start_time));
358 }
359 #[test]
360 fn get_set_end_time() {
361 let end_time = OffsetDateTime::now_utc();
362
363 let mut query = TopicMessageQuery::new();
364 query.end_time(end_time);
365
366 assert_eq!(query.get_end_time(), Some(end_time));
367 }
368 #[test]
369 fn get_set_limit() {
370 let mut query = TopicMessageQuery::new();
371 query.limit(1415);
372
373 assert_eq!(query.get_limit(), 1415);
374 }
375}