danube_core/
message.rs

1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3use std::fmt::{self, Display, Formatter};
4
5use crate::proto::{MsgId, StreamMessage as ProtoStreamMessage};
6
7// TODO! messageID is very important as it will be used to identify the message
8// it should be constructed by producer, amended maybe by the broker and sent back to the consumer
9// the consumer will used the messageID in the ack mechanism so the Broker will easily identify the acked message
10// the below struct will be used by both client SDK and broker to identify the message.
11
12#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
13pub struct MessageID {
14    // Identifies the producer, associated with a unique topic
15    pub producer_id: u64,
16    // topic_name is the name of the topic the message belongs to
17    // this is required by the broker to send the ack to the correct topic
18    pub topic_name: String,
19    // broker_addr is the address of the broker that sent the message to the consumer
20    // this is required by the consumer to send the ack to the correct broker
21    pub broker_addr: String,
22    // Topic offset is the offset of the message within the topic
23    pub topic_offset: u64,
24}
25
26impl Display for MessageID {
27    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
28        write!(
29            f,
30            "topic:_{}_producer:_{}_topic_offset:_{}",
31            self.topic_name, self.producer_id, self.topic_offset,
32        )
33    }
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct StreamMessage {
38    // Unique ID for tracking the message request
39    pub request_id: u64,
40    // Identifies the message, associated with a unique topic, subscription and the broker
41    pub msg_id: MessageID,
42    // The actual payload of the message
43    pub payload: Vec<u8>,
44    // Timestamp for when the message was published
45    pub publish_time: u64,
46    // Identifies the producer’s name
47    pub producer_name: String,
48    // subscription_name is the name of the subscription the consumer is subscribed to
49    // this is required by the broker to send the ack to the correct subscription
50    pub subscription_name: Option<String>,
51    // User-defined properties/attributes
52    pub attributes: HashMap<String, String>,
53}
54
55impl StreamMessage {
56    pub fn size(&self) -> usize {
57        self.payload.len()
58    }
59    pub fn add_subscription_name(&mut self, subscription_name: &String) {
60        self.subscription_name = Some(subscription_name.into());
61    }
62}
63
64impl From<MsgId> for MessageID {
65    fn from(proto_msg_id: MsgId) -> Self {
66        MessageID {
67            producer_id: proto_msg_id.producer_id,
68            topic_name: proto_msg_id.topic_name,
69            broker_addr: proto_msg_id.broker_addr,
70            topic_offset: proto_msg_id.topic_offset,
71        }
72    }
73}
74
75impl From<ProtoStreamMessage> for StreamMessage {
76    fn from(proto_stream_msg: ProtoStreamMessage) -> Self {
77        StreamMessage {
78            request_id: proto_stream_msg.request_id,
79            msg_id: proto_stream_msg.msg_id.map_or_else(
80                || panic!("Message ID cannot be None"),
81                |msg_id| msg_id.into(),
82            ),
83            payload: proto_stream_msg.payload,
84            publish_time: proto_stream_msg.publish_time,
85            producer_name: proto_stream_msg.producer_name,
86            subscription_name: Some(proto_stream_msg.subscription_name),
87            attributes: proto_stream_msg.attributes,
88        }
89    }
90}
91
92impl From<MessageID> for MsgId {
93    fn from(msg_id: MessageID) -> Self {
94        MsgId {
95            producer_id: msg_id.producer_id,
96            topic_name: msg_id.topic_name,
97            broker_addr: msg_id.broker_addr,
98            topic_offset: msg_id.topic_offset,
99        }
100    }
101}
102
103impl From<StreamMessage> for ProtoStreamMessage {
104    fn from(stream_msg: StreamMessage) -> Self {
105        ProtoStreamMessage {
106            request_id: stream_msg.request_id,
107            msg_id: Some(stream_msg.msg_id.into()), // Convert MessageID into MsgId
108            payload: stream_msg.payload,
109            publish_time: stream_msg.publish_time,
110            producer_name: stream_msg.producer_name,
111            subscription_name: stream_msg.subscription_name.unwrap_or_default(),
112            attributes: stream_msg.attributes,
113        }
114    }
115}