use std::collections::HashMap;
use std::{
mem,
task,
};
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_core::Stream;
use futures_util::TryStreamExt;
use hiero_sdk_proto::mirror;
use hiero_sdk_proto::mirror::consensus_service_client::ConsensusServiceClient;
use hiero_sdk_proto::mirror::ConsensusTopicQuery;
use time::{
Duration,
OffsetDateTime,
};
use tonic::transport::Channel;
use tonic::Response;
use super::topic_message::{
PbTopicMessageChunk,
PbTopicMessageHeader,
};
use crate::mirror_query::{
AnyMirrorQueryData,
AnyMirrorQueryMessage,
MirrorRequest,
};
use crate::protobuf::FromProtobuf;
use crate::{
AnyMirrorQueryResponse,
MirrorQuery,
ToProtobuf,
TopicId,
TopicMessage,
TransactionId,
};
#[derive(Default)]
pub struct TopicMessageQueryContext {
start_time: Option<OffsetDateTime>,
}
pub type TopicMessageQuery = MirrorQuery<TopicMessageQueryData>;
#[derive(Debug, Default, Clone)]
pub struct TopicMessageQueryData {
topic_id: Option<TopicId>,
start_time: Option<OffsetDateTime>,
end_time: Option<OffsetDateTime>,
limit: u64,
}
impl TopicMessageQueryData {
fn map_stream<'a, S>(stream: S) -> impl Stream<Item = crate::Result<TopicMessage>>
where
S: Stream<Item = crate::Result<mirror::ConsensusTopicResponse>> + Send + 'a,
{
MessagesMapStream { inner: stream, incomplete_messages: HashMap::new() }
}
}
impl TopicMessageQuery {
#[must_use]
pub fn get_topic_id(&self) -> Option<TopicId> {
self.data.topic_id
}
pub fn topic_id(&mut self, id: impl Into<TopicId>) -> &mut Self {
self.data.topic_id = Some(id.into());
self
}
#[must_use]
pub fn get_start_time(&self) -> Option<OffsetDateTime> {
self.data.start_time
}
pub fn start_time(&mut self, time: OffsetDateTime) -> &mut Self {
self.data.start_time = Some(time);
self
}
#[must_use]
pub fn get_end_time(&self) -> Option<OffsetDateTime> {
self.data.end_time
}
pub fn end_time(&mut self, time: OffsetDateTime) -> &mut Self {
self.data.end_time = Some(time);
self
}
#[must_use]
pub fn get_limit(&self) -> u64 {
self.data.limit
}
pub fn limit(&mut self, limit: u64) -> &mut Self {
self.data.limit = limit;
self
}
}
impl From<TopicMessageQueryData> for AnyMirrorQueryData {
fn from(data: TopicMessageQueryData) -> Self {
Self::TopicMessage(data)
}
}
impl MirrorRequest for TopicMessageQueryData {
type GrpcItem = mirror::ConsensusTopicResponse;
type ConnectStream = tonic::Streaming<Self::GrpcItem>;
type Context = TopicMessageQueryContext;
type Item = TopicMessage;
type Response = Vec<TopicMessage>;
type ItemStream<'a> = BoxStream<'a, crate::Result<TopicMessage>>;
fn connect(
&self,
context: &Self::Context,
channel: Channel,
) -> BoxFuture<'_, tonic::Result<Self::ConnectStream>> {
let topic_id = self.topic_id.to_protobuf();
let consensus_end_time = self.end_time.map(Into::into);
let consensus_start_time = context
.start_time
.map(|it| it.checked_add(Duration::nanoseconds(1)).unwrap())
.or(self.start_time)
.map(Into::into);
let request = ConsensusTopicQuery {
consensus_end_time,
consensus_start_time,
topic_id,
limit: self.limit,
};
Box::pin(async move {
ConsensusServiceClient::new(channel)
.subscribe_topic(request)
.await
.map(Response::into_inner)
})
}
fn make_item_stream<'a, S>(stream: S) -> Self::ItemStream<'a>
where
S: Stream<Item = crate::Result<Self::GrpcItem>> + Send + 'a,
{
Box::pin(Self::map_stream(stream))
}
fn try_collect<'a, S>(stream: S) -> BoxFuture<'a, crate::Result<Self::Response>>
where
S: Stream<Item = crate::Result<Self::GrpcItem>> + Send + 'a,
{
Box::pin(Self::map_stream(stream).try_collect())
}
fn update_context(context: &mut Self::Context, item: &Self::GrpcItem) {
context.start_time =
item.consensus_timestamp.map(OffsetDateTime::from).or(context.start_time);
}
}
impl From<TopicMessage> for AnyMirrorQueryMessage {
fn from(value: TopicMessage) -> Self {
Self::TopicMessage(value)
}
}
impl From<Vec<TopicMessage>> for AnyMirrorQueryResponse {
fn from(value: Vec<TopicMessage>) -> Self {
Self::TopicMessage(value)
}
}
enum IncompleteMessage {
Partial(OffsetDateTime, Vec<PbTopicMessageChunk>),
Expired,
Complete,
}
impl IncompleteMessage {
fn handle_expiry(&mut self) -> &mut Self {
match self {
IncompleteMessage::Partial(expiry, _) if *expiry < OffsetDateTime::now_utc() => {
*self = Self::Expired;
}
_ => {}
}
self
}
}
pin_project_lite::pin_project! {
struct MessagesMapStream<S> {
#[pin]
inner: S,
incomplete_messages: HashMap<TransactionId, IncompleteMessage>,
}
}
impl<S> Stream for MessagesMapStream<S>
where
S: Stream<Item = crate::Result<mirror::ConsensusTopicResponse>> + Send,
{
type Item = crate::Result<TopicMessage>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Option<Self::Item>> {
use task::Poll;
let mut this = self.project();
loop {
let item = match task::ready!(this.inner.as_mut().poll_next(cx)) {
Some(Ok(item)) => item,
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => return Poll::Ready(None),
};
match filter_map(item, this.incomplete_messages) {
Ok(Some(item)) => return Poll::Ready(Some(Ok(item))),
Ok(None) => {}
Err(e) => return Poll::Ready(Some(Err(e))),
}
}
}
}
fn filter_map(
mut item: mirror::ConsensusTopicResponse,
incomplete_messages: &mut HashMap<TransactionId, IncompleteMessage>,
) -> crate::Result<Option<TopicMessage>> {
let header = PbTopicMessageHeader {
consensus_timestamp: pb_getf!(item, consensus_timestamp)?.into(),
sequence_number: item.sequence_number,
running_hash: item.running_hash,
running_hash_version: item.running_hash_version,
message: item.message,
};
let item = match item.chunk_info.take() {
Some(chunk_info) if chunk_info.total > 1 => PbTopicMessageChunk {
header,
initial_transaction_id: TransactionId::from_protobuf(pb_getf!(
chunk_info,
initial_transaction_id
)?)?,
number: chunk_info.number,
total: chunk_info.total,
},
_ => return Ok(Some(TopicMessage::from_single(header))),
};
let tx_id = item.initial_transaction_id;
let entry = incomplete_messages.entry(tx_id).or_insert_with(|| {
IncompleteMessage::Partial(
OffsetDateTime::now_utc() + time::Duration::minutes(15),
Vec::new(),
)
});
let IncompleteMessage::Partial(_, messages) = entry.handle_expiry() else { return Ok(None) };
match messages.binary_search_by_key(&item.number, |it| it.number) {
Ok(_) => {}
Err(index) => messages.insert(index, item),
};
let total = messages.iter().map(|it| it.total).min().unwrap();
match messages.len() >= total as usize {
true => {
let messages = mem::take(messages);
*entry = IncompleteMessage::Complete;
Ok(Some(TopicMessage::from_chunks(messages)))
}
false => Ok(None),
}
}
#[cfg(test)]
mod tests {
use time::OffsetDateTime;
use crate::{
TopicId,
TopicMessageQuery,
};
#[test]
fn get_set_topic_id() {
let mut query = TopicMessageQuery::new();
query.topic_id(TopicId::new(31, 41, 59));
assert_eq!(query.get_topic_id(), Some(TopicId::new(31, 41, 59)));
}
#[test]
fn get_set_start_time() {
let start_time = OffsetDateTime::now_utc();
let mut query = TopicMessageQuery::new();
query.start_time(start_time);
assert_eq!(query.get_start_time(), Some(start_time));
}
#[test]
fn get_set_end_time() {
let end_time = OffsetDateTime::now_utc();
let mut query = TopicMessageQuery::new();
query.end_time(end_time);
assert_eq!(query.get_end_time(), Some(end_time));
}
#[test]
fn get_set_limit() {
let mut query = TopicMessageQuery::new();
query.limit(1415);
assert_eq!(query.get_limit(), 1415);
}
}