hiero_sdk/topic/
topic_message.rs1use std::iter;
4
5use time::OffsetDateTime;
6
7use crate::TransactionId;
8
9#[non_exhaustive]
11#[derive(Clone, Debug)]
12pub struct TopicMessageChunk {
13 pub consensus_timestamp: OffsetDateTime,
15
16 pub content_size: usize,
18
19 pub running_hash: Vec<u8>,
21
22 pub sequence_number: u64,
24}
25
26#[non_exhaustive]
28#[derive(Clone, Debug)]
29pub struct TopicMessage {
30 pub consensus_timestamp: OffsetDateTime,
34
35 pub contents: Vec<u8>,
37
38 pub running_hash: Vec<u8>,
42
43 pub running_hash_version: u64,
47
48 pub sequence_number: u64,
53
54 pub chunks: Option<Vec<TopicMessageChunk>>,
56
57 pub transaction: Option<TransactionId>,
59}
60
61impl TopicMessage {
62 pub(crate) fn from_single(pb: PbTopicMessageHeader) -> Self {
63 Self {
64 consensus_timestamp: pb.consensus_timestamp,
65 contents: pb.message,
66 running_hash: pb.running_hash,
67 running_hash_version: pb.running_hash_version,
68 sequence_number: pb.sequence_number,
69 chunks: None,
70 transaction: None,
71 }
72 }
73
74 pub(crate) fn from_chunks(pb: Vec<PbTopicMessageChunk>) -> Self {
75 assert!(!pb.is_empty(), "no chunks provided to `TopicMessage::from_chunks`");
76
77 if log::log_enabled!(log::Level::Warn) {
78 let (first, rest) = pb.split_first().unwrap();
79
80 if !rest.iter().all(|it| first.total == it.total) {
81 log::warn!("`TopicMessageChunk` mismatched totals (ignoring)");
82 }
83
84 let all_ascending_no_gaps = pb.iter().all({
85 let mut current = 1;
86 move |it| {
87 let res = it.number == current;
88 current += 1;
89
90 res
91 }
92 });
93
94 if !all_ascending_no_gaps {
95 log::warn!("`TopicMessageChunk` mismatched numbers (ignoring)");
96 }
98 }
99
100 let contents = pb.iter().fold(Vec::new(), |mut acc, it| {
101 acc.extend_from_slice(&it.header.message);
102 acc
103 });
104
105 let mut pb = pb;
106
107 let last = pb.pop().unwrap();
108
109 let chunks = pb
110 .into_iter()
111 .map(|it| TopicMessageChunk {
112 consensus_timestamp: it.header.consensus_timestamp,
113 content_size: it.header.message.len(),
114 running_hash: it.header.running_hash,
115 sequence_number: it.header.sequence_number,
116 })
117 .chain(iter::once(TopicMessageChunk {
118 consensus_timestamp: last.header.consensus_timestamp,
119 content_size: last.header.message.len(),
120 running_hash: last.header.running_hash.clone(),
121 sequence_number: last.header.sequence_number,
122 }))
123 .collect();
124
125 Self {
126 consensus_timestamp: last.header.consensus_timestamp,
127 contents,
128 running_hash: last.header.running_hash,
129 running_hash_version: last.header.running_hash_version,
130 sequence_number: last.header.sequence_number,
131 chunks: Some(chunks),
132 transaction: Some(last.initial_transaction_id),
133 }
134 }
135}
136
137pub(crate) struct PbTopicMessageHeader {
138 pub(crate) consensus_timestamp: OffsetDateTime,
139 pub(crate) sequence_number: u64,
140 pub(crate) running_hash: Vec<u8>,
141 pub(crate) running_hash_version: u64,
142 pub(crate) message: Vec<u8>,
143}
144
145pub(crate) struct PbTopicMessageChunk {
146 pub(crate) header: PbTopicMessageHeader,
147 pub(crate) initial_transaction_id: TransactionId,
148 pub(crate) number: i32,
149 pub(crate) total: i32,
150}