Skip to main content

hiero_sdk/topic/
topic_message_query.rs

1// SPDX-License-Identifier: Apache-2.0
2
3use std::collections::HashMap;
4use std::{
5    mem,
6    task,
7};
8
9use futures_core::future::BoxFuture;
10use futures_core::stream::BoxStream;
11use futures_core::Stream;
12use futures_util::TryStreamExt;
13use hiero_sdk_proto::mirror;
14use hiero_sdk_proto::mirror::consensus_service_client::ConsensusServiceClient;
15use hiero_sdk_proto::mirror::ConsensusTopicQuery;
16use time::{
17    Duration,
18    OffsetDateTime,
19};
20use tonic::transport::Channel;
21use tonic::Response;
22
23use super::topic_message::{
24    PbTopicMessageChunk,
25    PbTopicMessageHeader,
26};
27use crate::mirror_query::{
28    AnyMirrorQueryData,
29    AnyMirrorQueryMessage,
30    MirrorRequest,
31};
32use crate::protobuf::FromProtobuf;
33use crate::{
34    AnyMirrorQueryResponse,
35    MirrorQuery,
36    ToProtobuf,
37    TopicId,
38    TopicMessage,
39    TransactionId,
40};
41
42// TODO: test, test, and test
43// TODO: investigate failure scenarios
44
45// TODO: validate checksums after PR is merged
46
47#[derive(Default)]
48pub struct TopicMessageQueryContext {
49    start_time: Option<OffsetDateTime>,
50}
51
52/// Query a stream of Hiero Consensus Service (HCS)
53/// messages for an HCS Topic via a specific (possibly open-ended) time range.
54pub type TopicMessageQuery = MirrorQuery<TopicMessageQueryData>;
55
56#[derive(Debug, Default, Clone)]
57pub struct TopicMessageQueryData {
58    /// The topic ID to retrieve messages for.
59    topic_id: Option<TopicId>,
60
61    /// Include messages which reached consensus on or after this time.
62    /// Defaults to the current time.
63    start_time: Option<OffsetDateTime>,
64
65    /// Include messages which reached consensus before this time.
66    end_time: Option<OffsetDateTime>,
67
68    /// The maximum number of messages to receive before stopping.
69    limit: u64,
70}
71
72impl TopicMessageQueryData {
73    fn map_stream<'a, S>(stream: S) -> impl Stream<Item = crate::Result<TopicMessage>>
74    where
75        S: Stream<Item = crate::Result<mirror::ConsensusTopicResponse>> + Send + 'a,
76    {
77        MessagesMapStream { inner: stream, incomplete_messages: HashMap::new() }
78    }
79}
80
81impl TopicMessageQuery {
82    /// Returns the ID of the topic to retrieve messages for.
83    #[must_use]
84    pub fn get_topic_id(&self) -> Option<TopicId> {
85        self.data.topic_id
86    }
87
88    /// Sets the topic ID to retrieve messages for.
89    pub fn topic_id(&mut self, id: impl Into<TopicId>) -> &mut Self {
90        self.data.topic_id = Some(id.into());
91        self
92    }
93
94    /// Returns the minimum `consensus_timestamp` of the messages to return.
95    #[must_use]
96    pub fn get_start_time(&self) -> Option<OffsetDateTime> {
97        self.data.start_time
98    }
99
100    /// Sets to include messages which reached consensus on or after this time.
101    /// Defaults to the current time.
102    pub fn start_time(&mut self, time: OffsetDateTime) -> &mut Self {
103        self.data.start_time = Some(time);
104        self
105    }
106
107    /// Returns the maximum `consensus_timestamp` of the messages to return.
108    #[must_use]
109    pub fn get_end_time(&self) -> Option<OffsetDateTime> {
110        self.data.end_time
111    }
112
113    /// Sets to include messages which reached consensus before this time.
114    pub fn end_time(&mut self, time: OffsetDateTime) -> &mut Self {
115        self.data.end_time = Some(time);
116        self
117    }
118
119    /// Returns maximum number of messages to be returned.
120    #[must_use]
121    pub fn get_limit(&self) -> u64 {
122        self.data.limit
123    }
124
125    /// Sets the maximum number of messages to be returned, before closing the subscription.
126    /// Defaults to _unlimited_.
127    pub fn limit(&mut self, limit: u64) -> &mut Self {
128        self.data.limit = limit;
129        self
130    }
131}
132
133impl From<TopicMessageQueryData> for AnyMirrorQueryData {
134    fn from(data: TopicMessageQueryData) -> Self {
135        Self::TopicMessage(data)
136    }
137}
138
139impl MirrorRequest for TopicMessageQueryData {
140    type GrpcItem = mirror::ConsensusTopicResponse;
141
142    type ConnectStream = tonic::Streaming<Self::GrpcItem>;
143
144    type Context = TopicMessageQueryContext;
145
146    type Item = TopicMessage;
147
148    type Response = Vec<TopicMessage>;
149
150    type ItemStream<'a> = BoxStream<'a, crate::Result<TopicMessage>>;
151
152    fn connect(
153        &self,
154        context: &Self::Context,
155        channel: Channel,
156    ) -> BoxFuture<'_, tonic::Result<Self::ConnectStream>> {
157        let topic_id = self.topic_id.to_protobuf();
158
159        let consensus_end_time = self.end_time.map(Into::into);
160
161        // If we had to reconnect, we want to start 1ns after the last message we recieved.
162        // We don't want to start *at* the last message we recieved because that'd give us that message again.
163        let consensus_start_time = context
164            .start_time
165            .map(|it| it.checked_add(Duration::nanoseconds(1)).unwrap())
166            .or(self.start_time)
167            .map(Into::into);
168
169        let request = ConsensusTopicQuery {
170            consensus_end_time,
171            consensus_start_time,
172            topic_id,
173            limit: self.limit,
174        };
175
176        Box::pin(async move {
177            ConsensusServiceClient::new(channel)
178                .subscribe_topic(request)
179                .await
180                .map(Response::into_inner)
181        })
182    }
183
184    fn make_item_stream<'a, S>(stream: S) -> Self::ItemStream<'a>
185    where
186        S: Stream<Item = crate::Result<Self::GrpcItem>> + Send + 'a,
187    {
188        Box::pin(Self::map_stream(stream))
189    }
190
191    fn try_collect<'a, S>(stream: S) -> BoxFuture<'a, crate::Result<Self::Response>>
192    where
193        S: Stream<Item = crate::Result<Self::GrpcItem>> + Send + 'a,
194    {
195        // this doesn't reuse the work in `make_item_stream`
196        Box::pin(Self::map_stream(stream).try_collect())
197    }
198
199    fn update_context(context: &mut Self::Context, item: &Self::GrpcItem) {
200        context.start_time =
201            item.consensus_timestamp.map(OffsetDateTime::from).or(context.start_time);
202    }
203}
204
205impl From<TopicMessage> for AnyMirrorQueryMessage {
206    fn from(value: TopicMessage) -> Self {
207        Self::TopicMessage(value)
208    }
209}
210
211impl From<Vec<TopicMessage>> for AnyMirrorQueryResponse {
212    fn from(value: Vec<TopicMessage>) -> Self {
213        Self::TopicMessage(value)
214    }
215}
216
217enum IncompleteMessage {
218    Partial(OffsetDateTime, Vec<PbTopicMessageChunk>),
219    Expired,
220    Complete,
221}
222
223impl IncompleteMessage {
224    fn handle_expiry(&mut self) -> &mut Self {
225        match self {
226            IncompleteMessage::Partial(expiry, _) if *expiry < OffsetDateTime::now_utc() => {
227                *self = Self::Expired;
228            }
229            _ => {}
230        }
231
232        self
233    }
234}
235
236pin_project_lite::pin_project! {
237    struct MessagesMapStream<S> {
238        #[pin]
239        inner: S,
240        incomplete_messages: HashMap<TransactionId, IncompleteMessage>,
241    }
242}
243
244impl<S> Stream for MessagesMapStream<S>
245where
246    S: Stream<Item = crate::Result<mirror::ConsensusTopicResponse>> + Send,
247{
248    type Item = crate::Result<TopicMessage>;
249
250    fn poll_next(
251        self: std::pin::Pin<&mut Self>,
252        cx: &mut task::Context<'_>,
253    ) -> task::Poll<Option<Self::Item>> {
254        use task::Poll;
255
256        let mut this = self.project();
257
258        loop {
259            let item = match task::ready!(this.inner.as_mut().poll_next(cx)) {
260                Some(Ok(item)) => item,
261                Some(Err(e)) => return Poll::Ready(Some(Err(e))),
262                None => return Poll::Ready(None),
263            };
264
265            match filter_map(item, this.incomplete_messages) {
266                Ok(Some(item)) => return Poll::Ready(Some(Ok(item))),
267                Ok(None) => {}
268                Err(e) => return Poll::Ready(Some(Err(e))),
269            }
270        }
271    }
272}
273
274fn filter_map(
275    mut item: mirror::ConsensusTopicResponse,
276    incomplete_messages: &mut HashMap<TransactionId, IncompleteMessage>,
277) -> crate::Result<Option<TopicMessage>> {
278    let header = PbTopicMessageHeader {
279        consensus_timestamp: pb_getf!(item, consensus_timestamp)?.into(),
280        sequence_number: item.sequence_number,
281        running_hash: item.running_hash,
282        running_hash_version: item.running_hash_version,
283        message: item.message,
284    };
285
286    let item = match item.chunk_info.take() {
287        Some(chunk_info) if chunk_info.total > 1 => PbTopicMessageChunk {
288            header,
289            initial_transaction_id: TransactionId::from_protobuf(pb_getf!(
290                chunk_info,
291                initial_transaction_id
292            )?)?,
293            number: chunk_info.number,
294            total: chunk_info.total,
295        },
296        _ => return Ok(Some(TopicMessage::from_single(header))),
297    };
298
299    let tx_id = item.initial_transaction_id;
300
301    let entry = incomplete_messages.entry(tx_id).or_insert_with(|| {
302        IncompleteMessage::Partial(
303            // todo: configurable?
304            OffsetDateTime::now_utc() + time::Duration::minutes(15),
305            Vec::new(),
306        )
307    });
308
309    let IncompleteMessage::Partial(_, messages) = entry.handle_expiry() else { return Ok(None) };
310
311    match messages.binary_search_by_key(&item.number, |it| it.number) {
312        // We have a duplicate `number`, so, we'll just ignore it (this is unspecified behavior)
313        Ok(_) => {}
314        Err(index) => messages.insert(index, item),
315    };
316
317    // find the smallest `total` so that we aren't susceptable to stuff like total changing (and getting bigger)
318    // later on there's a check that ensures that they all have the same total.
319    let total = messages.iter().map(|it| it.total).min().unwrap();
320
321    // note: because of the way we handle `total`, `total` can get *smaller*.
322
323    match messages.len() >= total as usize {
324        true => {
325            let messages = mem::take(messages);
326            *entry = IncompleteMessage::Complete;
327            Ok(Some(TopicMessage::from_chunks(messages)))
328        }
329
330        false => Ok(None),
331    }
332}
333
334#[cfg(test)]
335mod tests {
336    use time::OffsetDateTime;
337
338    use crate::{
339        TopicId,
340        TopicMessageQuery,
341    };
342
343    #[test]
344    fn get_set_topic_id() {
345        let mut query = TopicMessageQuery::new();
346        query.topic_id(TopicId::new(31, 41, 59));
347
348        assert_eq!(query.get_topic_id(), Some(TopicId::new(31, 41, 59)));
349    }
350    #[test]
351    fn get_set_start_time() {
352        let start_time = OffsetDateTime::now_utc();
353
354        let mut query = TopicMessageQuery::new();
355        query.start_time(start_time);
356
357        assert_eq!(query.get_start_time(), Some(start_time));
358    }
359    #[test]
360    fn get_set_end_time() {
361        let end_time = OffsetDateTime::now_utc();
362
363        let mut query = TopicMessageQuery::new();
364        query.end_time(end_time);
365
366        assert_eq!(query.get_end_time(), Some(end_time));
367    }
368    #[test]
369    fn get_set_limit() {
370        let mut query = TopicMessageQuery::new();
371        query.limit(1415);
372
373        assert_eq!(query.get_limit(), 1415);
374    }
375}