1use crate::{
2 bytesrepr::{self, Bytes, FromBytes, ToBytes, U8_SERIALIZED_LENGTH},
3 checksummed_hex, crypto, EntityAddr, Key,
4};
5
6use alloc::{string::String, vec::Vec};
7use core::{convert::TryFrom, fmt::Debug};
8#[cfg(any(feature = "std", test))]
9use thiserror::Error;
10
11#[cfg(feature = "datasize")]
12use datasize::DataSize;
13#[cfg(any(feature = "testing", test))]
14use rand::{
15 distributions::{Alphanumeric, DistString, Distribution, Standard},
16 Rng,
17};
18#[cfg(feature = "json-schema")]
19use schemars::JsonSchema;
20use serde::{de::Error as SerdeError, Deserialize, Deserializer, Serialize, Serializer};
21
22#[cfg(any(feature = "testing", test))]
23use crate::testing::TestRng;
24
25use super::{FromStrError, TopicNameHash};
26
27pub type Messages = Vec<Message>;
29
30pub const MESSAGE_CHECKSUM_LENGTH: usize = 32;
32
33const MESSAGE_CHECKSUM_STRING_PREFIX: &str = "message-checksum-";
34
35#[derive(Default, PartialOrd, Ord, PartialEq, Eq, Clone, Copy, Debug)]
38#[cfg_attr(feature = "datasize", derive(DataSize))]
39#[cfg_attr(
40 feature = "json-schema",
41 derive(JsonSchema),
42 schemars(description = "Message checksum as a formatted string.")
43)]
44pub struct MessageChecksum(
45 #[cfg_attr(feature = "json-schema", schemars(skip, with = "String"))]
46 pub [u8; MESSAGE_CHECKSUM_LENGTH],
47);
48
49impl MessageChecksum {
50 pub fn value(&self) -> [u8; MESSAGE_CHECKSUM_LENGTH] {
52 self.0
53 }
54
55 pub fn to_formatted_string(self) -> String {
57 format!(
58 "{}{}",
59 MESSAGE_CHECKSUM_STRING_PREFIX,
60 base16::encode_lower(&self.0),
61 )
62 }
63
64 pub fn from_formatted_str(input: &str) -> Result<Self, FromStrError> {
67 let hex_addr = input
68 .strip_prefix(MESSAGE_CHECKSUM_STRING_PREFIX)
69 .ok_or(FromStrError::InvalidPrefix)?;
70
71 let bytes =
72 <[u8; MESSAGE_CHECKSUM_LENGTH]>::try_from(checksummed_hex::decode(hex_addr)?.as_ref())?;
73 Ok(MessageChecksum(bytes))
74 }
75}
76
77impl ToBytes for MessageChecksum {
78 fn to_bytes(&self) -> Result<Vec<u8>, bytesrepr::Error> {
79 let mut buffer = bytesrepr::allocate_buffer(self)?;
80 buffer.append(&mut self.0.to_bytes()?);
81 Ok(buffer)
82 }
83
84 fn serialized_length(&self) -> usize {
85 self.0.serialized_length()
86 }
87}
88
89impl FromBytes for MessageChecksum {
90 fn from_bytes(bytes: &[u8]) -> Result<(Self, &[u8]), bytesrepr::Error> {
91 let (checksum, rem) = FromBytes::from_bytes(bytes)?;
92 Ok((MessageChecksum(checksum), rem))
93 }
94}
95
96impl Serialize for MessageChecksum {
97 fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
98 if serializer.is_human_readable() {
99 self.to_formatted_string().serialize(serializer)
100 } else {
101 self.0.serialize(serializer)
102 }
103 }
104}
105
106impl<'de> Deserialize<'de> for MessageChecksum {
107 fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
108 if deserializer.is_human_readable() {
109 let formatted_string = String::deserialize(deserializer)?;
110 MessageChecksum::from_formatted_str(&formatted_string).map_err(SerdeError::custom)
111 } else {
112 let bytes = <[u8; MESSAGE_CHECKSUM_LENGTH]>::deserialize(deserializer)?;
113 Ok(MessageChecksum(bytes))
114 }
115 }
116}
117
118const MESSAGE_PAYLOAD_TAG_LENGTH: usize = U8_SERIALIZED_LENGTH;
119
120pub const MESSAGE_PAYLOAD_STRING_TAG: u8 = 0;
122pub const MESSAGE_PAYLOAD_BYTES_TAG: u8 = 1;
124
125#[derive(Clone, Eq, PartialEq, Serialize, Deserialize, Debug)]
127#[cfg_attr(feature = "datasize", derive(DataSize))]
128#[cfg_attr(feature = "json-schema", derive(JsonSchema))]
129pub enum MessagePayload {
130 String(String),
132 Bytes(Bytes),
134}
135
136impl MessagePayload {
137 #[cfg(any(feature = "testing", test))]
138 pub fn random(rng: &mut TestRng) -> Self {
140 let count = rng.gen_range(16..128);
141 if rng.gen() {
142 MessagePayload::String(Alphanumeric.sample_string(rng, count))
143 } else {
144 MessagePayload::Bytes(
145 std::iter::repeat_with(|| rng.gen())
146 .take(count)
147 .collect::<Vec<u8>>()
148 .into(),
149 )
150 }
151 }
152}
153
154impl<T> From<T> for MessagePayload
155where
156 T: Into<String>,
157{
158 fn from(value: T) -> Self {
159 Self::String(value.into())
160 }
161}
162
163impl From<Bytes> for MessagePayload {
164 fn from(bytes: Bytes) -> Self {
165 Self::Bytes(bytes)
166 }
167}
168
169impl ToBytes for MessagePayload {
170 fn to_bytes(&self) -> Result<Vec<u8>, bytesrepr::Error> {
171 let mut buffer = bytesrepr::allocate_buffer(self)?;
172 match self {
173 MessagePayload::String(message_string) => {
174 buffer.insert(0, MESSAGE_PAYLOAD_STRING_TAG);
175 buffer.extend(message_string.to_bytes()?);
176 }
177 MessagePayload::Bytes(message_bytes) => {
178 buffer.insert(0, MESSAGE_PAYLOAD_BYTES_TAG);
179 buffer.extend(message_bytes.to_bytes()?);
180 }
181 }
182 Ok(buffer)
183 }
184
185 fn serialized_length(&self) -> usize {
186 MESSAGE_PAYLOAD_TAG_LENGTH
187 + match self {
188 MessagePayload::String(message_string) => message_string.serialized_length(),
189 MessagePayload::Bytes(message_bytes) => message_bytes.serialized_length(),
190 }
191 }
192}
193
194impl FromBytes for MessagePayload {
195 fn from_bytes(bytes: &[u8]) -> Result<(Self, &[u8]), bytesrepr::Error> {
196 let (tag, remainder) = u8::from_bytes(bytes)?;
197 match tag {
198 MESSAGE_PAYLOAD_STRING_TAG => {
199 let (message, remainder): (String, _) = FromBytes::from_bytes(remainder)?;
200 Ok((Self::String(message), remainder))
201 }
202 MESSAGE_PAYLOAD_BYTES_TAG => {
203 let (message_bytes, remainder): (Bytes, _) = FromBytes::from_bytes(remainder)?;
204 Ok((Self::Bytes(message_bytes), remainder))
205 }
206 _ => Err(bytesrepr::Error::Formatting),
207 }
208 }
209}
210
211#[derive(Clone, Eq, PartialEq, Debug)]
213#[cfg_attr(feature = "datasize", derive(DataSize))]
214#[cfg_attr(feature = "json-schema", derive(JsonSchema))]
215pub struct Message {
216 entity_addr: EntityAddr,
218 message: MessagePayload,
220 topic_name: String,
222 topic_name_hash: TopicNameHash,
224 topic_index: u32,
226 block_index: u64,
228}
229
230#[cfg(any(feature = "std", test))]
231#[derive(Serialize, Deserialize)]
232struct HumanReadableMessage {
233 entity_addr: String,
234 message: MessagePayload,
235 topic_name: String,
236 topic_name_hash: TopicNameHash,
237 topic_index: u32,
238 block_index: u64,
239}
240
241#[cfg(any(feature = "std", test))]
242impl From<&Message> for HumanReadableMessage {
243 fn from(message: &Message) -> Self {
244 Self {
245 entity_addr: message.entity_addr.to_formatted_string(),
246 message: message.message.clone(),
247 topic_name: message.topic_name.clone(),
248 topic_name_hash: message.topic_name_hash,
249 topic_index: message.topic_index,
250 block_index: message.block_index,
251 }
252 }
253}
254
255#[cfg(any(feature = "std", test))]
256impl From<&Message> for NonHumanReadableMessage {
257 fn from(message: &Message) -> Self {
258 Self {
259 entity_addr: message.entity_addr,
260 message: message.message.clone(),
261 topic_name: message.topic_name.clone(),
262 topic_name_hash: message.topic_name_hash,
263 topic_index: message.topic_index,
264 block_index: message.block_index,
265 }
266 }
267}
268
269#[cfg(any(feature = "std", test))]
270impl From<NonHumanReadableMessage> for Message {
271 fn from(message: NonHumanReadableMessage) -> Self {
272 Self {
273 entity_addr: message.entity_addr,
274 message: message.message,
275 topic_name: message.topic_name,
276 topic_name_hash: message.topic_name_hash,
277 topic_index: message.topic_index,
278 block_index: message.block_index,
279 }
280 }
281}
282
283#[cfg(any(feature = "std", test))]
284#[derive(Error, Debug)]
285enum MessageDeserializationError {
286 #[error("{0}")]
287 FailedToParseEntityAddr(crate::addressable_entity::FromStrError),
288}
289
290#[cfg(any(feature = "std", test))]
291impl TryFrom<HumanReadableMessage> for Message {
292 type Error = MessageDeserializationError;
293 fn try_from(message: HumanReadableMessage) -> Result<Self, Self::Error> {
294 let entity_addr = EntityAddr::from_formatted_str(&message.entity_addr)
295 .map_err(Self::Error::FailedToParseEntityAddr)?;
296
297 Ok(Self {
298 entity_addr,
299 message: message.message,
300 topic_name: message.topic_name,
301 topic_name_hash: message.topic_name_hash,
302 topic_index: message.topic_index,
303 block_index: message.block_index,
304 })
305 }
306}
307
308#[cfg(any(feature = "std", test))]
309#[derive(Serialize, Deserialize)]
310struct NonHumanReadableMessage {
311 entity_addr: EntityAddr,
312 message: MessagePayload,
313 topic_name: String,
314 topic_name_hash: TopicNameHash,
315 topic_index: u32,
316 block_index: u64,
317}
318
319#[cfg(any(feature = "std", test))]
320impl Serialize for Message {
321 fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
322 if serializer.is_human_readable() {
323 HumanReadableMessage::from(self).serialize(serializer)
324 } else {
325 NonHumanReadableMessage::from(self).serialize(serializer)
326 }
327 }
328}
329
330#[cfg(any(feature = "std", test))]
331impl<'de> Deserialize<'de> for Message {
332 fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
333 if deserializer.is_human_readable() {
334 let human_readable = HumanReadableMessage::deserialize(deserializer)?;
335 Message::try_from(human_readable)
336 .map_err(|error| SerdeError::custom(format!("{:?}", error)))
337 } else {
338 let non_human_readable = NonHumanReadableMessage::deserialize(deserializer)?;
339 Ok(Message::from(non_human_readable))
340 }
341 }
342}
343
344impl Message {
345 pub fn new(
347 source: EntityAddr,
348 message: MessagePayload,
349 topic_name: String,
350 topic_name_hash: TopicNameHash,
351 topic_index: u32,
352 block_index: u64,
353 ) -> Self {
354 Self {
355 entity_addr: source,
356 message,
357 topic_name,
358 topic_name_hash,
359 topic_index,
360 block_index,
361 }
362 }
363
364 pub fn entity_addr(&self) -> &EntityAddr {
366 &self.entity_addr
367 }
368
369 pub fn payload(&self) -> &MessagePayload {
371 &self.message
372 }
373
374 pub fn topic_name(&self) -> &str {
376 &self.topic_name
377 }
378
379 pub fn topic_name_hash(&self) -> &TopicNameHash {
381 &self.topic_name_hash
382 }
383
384 pub fn topic_index(&self) -> u32 {
386 self.topic_index
387 }
388
389 pub fn block_index(&self) -> u64 {
391 self.block_index
392 }
393
394 pub fn message_key(&self) -> Key {
397 Key::message(self.entity_addr, self.topic_name_hash, self.topic_index)
398 }
399
400 pub fn topic_key(&self) -> Key {
404 Key::message_topic(self.entity_addr, self.topic_name_hash)
405 }
406
407 pub fn checksum(&self) -> Result<MessageChecksum, bytesrepr::Error> {
409 let input = (&self.block_index, &self.message).to_bytes()?;
410 let checksum = crypto::blake2b(input);
411
412 Ok(MessageChecksum(checksum))
413 }
414
415 #[cfg(any(feature = "testing", test))]
417 pub fn random(rng: &mut TestRng) -> Self {
418 let count = rng.gen_range(16..128);
419 Self {
420 entity_addr: rng.gen(),
421 message: MessagePayload::random(rng),
422 topic_name: Alphanumeric.sample_string(rng, count),
423 topic_name_hash: rng.gen(),
424 topic_index: rng.gen(),
425 block_index: rng.gen(),
426 }
427 }
428}
429
430impl ToBytes for Message {
431 fn to_bytes(&self) -> Result<Vec<u8>, bytesrepr::Error> {
432 let mut buffer = bytesrepr::allocate_buffer(self)?;
433 buffer.append(&mut self.entity_addr.to_bytes()?);
434 buffer.append(&mut self.message.to_bytes()?);
435 buffer.append(&mut self.topic_name.to_bytes()?);
436 buffer.append(&mut self.topic_name_hash.to_bytes()?);
437 buffer.append(&mut self.topic_index.to_bytes()?);
438 buffer.append(&mut self.block_index.to_bytes()?);
439 Ok(buffer)
440 }
441
442 fn serialized_length(&self) -> usize {
443 self.entity_addr.serialized_length()
444 + self.message.serialized_length()
445 + self.topic_name.serialized_length()
446 + self.topic_name_hash.serialized_length()
447 + self.topic_index.serialized_length()
448 + self.block_index.serialized_length()
449 }
450}
451
452impl FromBytes for Message {
453 fn from_bytes(bytes: &[u8]) -> Result<(Self, &[u8]), bytesrepr::Error> {
454 let (entity_addr, rem) = FromBytes::from_bytes(bytes)?;
455 let (message, rem) = FromBytes::from_bytes(rem)?;
456 let (topic_name, rem) = FromBytes::from_bytes(rem)?;
457 let (topic_name_hash, rem) = FromBytes::from_bytes(rem)?;
458 let (topic_index, rem) = FromBytes::from_bytes(rem)?;
459 let (block_index, rem) = FromBytes::from_bytes(rem)?;
460 Ok((
461 Message {
462 entity_addr,
463 message,
464 topic_name,
465 topic_name_hash,
466 topic_index,
467 block_index,
468 },
469 rem,
470 ))
471 }
472}
473
474#[cfg(any(feature = "testing", test))]
475impl Distribution<Message> for Standard {
476 fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> Message {
477 let topic_name = Alphanumeric.sample_string(rng, 32);
478 let topic_name_hash = crypto::blake2b(&topic_name).into();
479 let message = Alphanumeric.sample_string(rng, 64).into();
480
481 Message {
482 entity_addr: rng.gen(),
483 message,
484 topic_name,
485 topic_name_hash,
486 topic_index: rng.gen(),
487 block_index: rng.gen(),
488 }
489 }
490}
491
492#[cfg(test)]
493mod tests {
494 use crate::bytesrepr;
495
496 use super::*;
497
498 #[test]
499 fn serialization_roundtrip() {
500 let rng = &mut TestRng::new();
501
502 let message_checksum = MessageChecksum([1; MESSAGE_CHECKSUM_LENGTH]);
503 bytesrepr::test_serialization_roundtrip(&message_checksum);
504
505 let message_payload = MessagePayload::random(rng);
506 bytesrepr::test_serialization_roundtrip(&message_payload);
507
508 let message = Message::random(rng);
509 bytesrepr::test_serialization_roundtrip(&message);
510 }
511
512 #[test]
513 fn json_roundtrip() {
514 let rng = &mut TestRng::new();
515
516 let message_payload = MessagePayload::random(rng);
517 let json_string = serde_json::to_string_pretty(&message_payload).unwrap();
518 let decoded: MessagePayload = serde_json::from_str(&json_string).unwrap();
519 assert_eq!(decoded, message_payload);
520 }
521
522 #[test]
523 fn message_json_roundtrip() {
524 let rng = &mut TestRng::new();
525
526 let message = Message::random(rng);
527 let json_string = serde_json::to_string_pretty(&message).unwrap();
528 let decoded: Message = serde_json::from_str(&json_string).unwrap();
529 assert_eq!(decoded, message);
530 }
531}