danube_core/
message.rs

1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3use std::fmt::{self, Display, Formatter};
4
5use crate::proto::{MsgId, StreamMessage as ProtoStreamMessage};
6
7// TODO! messageID is very important as it will be used to identify the message
8// it should be constructed by producer, amended maybe by the broker and sent back to the consumer
9// the consumer will used the messageID in the ack mechanism so the Broker will easily identify the acked message
10// the below struct will be used by both client SDK and broker to identify the message.
11
12#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
13pub struct MessageID {
14    // Identifies the producer, associated with a unique topic
15    pub producer_id: u64,
16    // topic_name is the name of the topic the message belongs to
17    // this is required by the broker to send the ack to the correct topic
18    pub topic_name: String,
19    // broker_addr is the address of the broker that sent the message to the consumer
20    // this is required by the consumer to send the ack to the correct broker
21    pub broker_addr: String,
22    // Topic offset is the offset of the message within the topic
23    pub topic_offset: u64,
24}
25
26impl Display for MessageID {
27    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
28        write!(
29            f,
30            "topic:_{}_producer:_{}_topic_offset:_{}",
31            self.topic_name, self.producer_id, self.topic_offset,
32        )
33    }
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct StreamMessage {
38    // Unique ID for tracking the message request
39    pub request_id: u64,
40    // Identifies the message, associated with a unique topic, subscription and the broker
41    pub msg_id: MessageID,
42    // The actual payload of the message
43    pub payload: Vec<u8>,
44    // Timestamp for when the message was published
45    pub publish_time: u64,
46    // Identifies the producer’s name
47    pub producer_name: String,
48    // subscription_name is the name of the subscription the consumer is subscribed to
49    // this is required by the broker to send the ack to the correct subscription
50    pub subscription_name: Option<String>,
51    // User-defined properties/attributes
52    pub attributes: HashMap<String, String>,
53    // Schema identification from schema registry
54    pub schema_id: Option<u64>,
55    pub schema_version: Option<u32>,
56}
57
58impl StreamMessage {
59    pub fn size(&self) -> usize {
60        self.payload.len()
61    }
62    pub fn add_subscription_name(&mut self, subscription_name: &String) {
63        self.subscription_name = Some(subscription_name.into());
64    }
65}
66
67impl From<MsgId> for MessageID {
68    fn from(proto_msg_id: MsgId) -> Self {
69        MessageID {
70            producer_id: proto_msg_id.producer_id,
71            topic_name: proto_msg_id.topic_name,
72            broker_addr: proto_msg_id.broker_addr,
73            topic_offset: proto_msg_id.topic_offset,
74        }
75    }
76}
77
78impl From<ProtoStreamMessage> for StreamMessage {
79    fn from(proto_stream_msg: ProtoStreamMessage) -> Self {
80        StreamMessage {
81            request_id: proto_stream_msg.request_id,
82            msg_id: proto_stream_msg.msg_id.map_or_else(
83                || panic!("Message ID cannot be None"),
84                |msg_id| msg_id.into(),
85            ),
86            payload: proto_stream_msg.payload,
87            publish_time: proto_stream_msg.publish_time,
88            producer_name: proto_stream_msg.producer_name,
89            subscription_name: Some(proto_stream_msg.subscription_name),
90            attributes: proto_stream_msg.attributes,
91            schema_id: proto_stream_msg.schema_id,
92            schema_version: proto_stream_msg.schema_version,
93        }
94    }
95}
96
97impl From<MessageID> for MsgId {
98    fn from(msg_id: MessageID) -> Self {
99        MsgId {
100            producer_id: msg_id.producer_id,
101            topic_name: msg_id.topic_name,
102            broker_addr: msg_id.broker_addr,
103            topic_offset: msg_id.topic_offset,
104        }
105    }
106}
107
108impl From<StreamMessage> for ProtoStreamMessage {
109    fn from(stream_msg: StreamMessage) -> Self {
110        ProtoStreamMessage {
111            request_id: stream_msg.request_id,
112            msg_id: Some(stream_msg.msg_id.into()), // Convert MessageID into MsgId
113            payload: stream_msg.payload,
114            publish_time: stream_msg.publish_time,
115            producer_name: stream_msg.producer_name,
116            subscription_name: stream_msg.subscription_name.unwrap_or_default(),
117            attributes: stream_msg.attributes,
118            schema_id: stream_msg.schema_id,
119            schema_version: stream_msg.schema_version,
120        }
121    }
122}