Skip to main content

danube_core/
message.rs

1use bytes::Bytes;
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use std::fmt::{self, Display, Formatter};
5
6use crate::proto::{MsgId, StreamMessage as ProtoStreamMessage};
7
8// TODO! messageID is very important as it will be used to identify the message
9// it should be constructed by producer, amended maybe by the broker and sent back to the consumer
10// the consumer will used the messageID in the ack mechanism so the Broker will easily identify the acked message
11// the below struct will be used by both client SDK and broker to identify the message.
12
13#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
14pub struct MessageID {
15    // Identifies the producer, associated with a unique topic
16    pub producer_id: u64,
17    // topic_name is the name of the topic the message belongs to
18    // this is required by the broker to send the ack to the correct topic
19    pub topic_name: String,
20    // broker_addr is the address of the broker that sent the message to the consumer
21    // this is required by the consumer to send the ack to the correct broker
22    pub broker_addr: String,
23    // Topic offset is the offset of the message within the topic
24    pub topic_offset: u64,
25}
26
27impl Display for MessageID {
28    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
29        write!(
30            f,
31            "topic:_{}_producer:_{}_topic_offset:_{}",
32            self.topic_name, self.producer_id, self.topic_offset,
33        )
34    }
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct StreamMessage {
39    // Unique ID for tracking the message request
40    pub request_id: u64,
41    // Identifies the message, associated with a unique topic, subscription and the broker
42    pub msg_id: MessageID,
43    // The actual payload of the message
44    pub payload: Bytes,
45    // Timestamp for when the message was published
46    pub publish_time: u64,
47    // Identifies the producer’s name
48    pub producer_name: String,
49    // subscription_name is the name of the subscription the consumer is subscribed to
50    // this is required by the broker to send the ack to the correct subscription
51    pub subscription_name: Option<String>,
52    // User-defined properties/attributes
53    pub attributes: HashMap<String, String>,
54    // Schema identification from schema registry
55    pub schema_id: Option<u64>,
56    pub schema_version: Option<u32>,
57    // Routing key for Key-Shared dispatch and application-level keying
58    pub routing_key: Option<String>,
59}
60
61impl StreamMessage {
62    pub fn size(&self) -> usize {
63        self.payload.len()
64    }
65    pub fn add_subscription_name(&mut self, subscription_name: &String) {
66        self.subscription_name = Some(subscription_name.into());
67    }
68    /// Returns the effective routing key for dispatch decisions.
69    /// Falls back to producer_name if no explicit routing_key is set.
70    pub fn effective_routing_key(&self) -> &str {
71        self.routing_key.as_deref().unwrap_or(&self.producer_name)
72    }
73}
74
75impl From<MsgId> for MessageID {
76    fn from(proto_msg_id: MsgId) -> Self {
77        MessageID {
78            producer_id: proto_msg_id.producer_id,
79            topic_name: proto_msg_id.topic_name,
80            broker_addr: proto_msg_id.broker_addr,
81            topic_offset: proto_msg_id.topic_offset,
82        }
83    }
84}
85
86impl From<ProtoStreamMessage> for StreamMessage {
87    fn from(proto_stream_msg: ProtoStreamMessage) -> Self {
88        StreamMessage {
89            request_id: proto_stream_msg.request_id,
90            msg_id: proto_stream_msg.msg_id.map_or_else(
91                || panic!("Message ID cannot be None"),
92                |msg_id| msg_id.into(),
93            ),
94            payload: proto_stream_msg.payload.into(),
95            publish_time: proto_stream_msg.publish_time,
96            producer_name: proto_stream_msg.producer_name,
97            subscription_name: Some(proto_stream_msg.subscription_name),
98            attributes: proto_stream_msg.attributes,
99            schema_id: proto_stream_msg.schema_id,
100            schema_version: proto_stream_msg.schema_version,
101            routing_key: proto_stream_msg.routing_key,
102        }
103    }
104}
105
106impl From<MessageID> for MsgId {
107    fn from(msg_id: MessageID) -> Self {
108        MsgId {
109            producer_id: msg_id.producer_id,
110            topic_name: msg_id.topic_name,
111            broker_addr: msg_id.broker_addr,
112            topic_offset: msg_id.topic_offset,
113        }
114    }
115}
116
117impl From<StreamMessage> for ProtoStreamMessage {
118    fn from(stream_msg: StreamMessage) -> Self {
119        ProtoStreamMessage {
120            request_id: stream_msg.request_id,
121            msg_id: Some(stream_msg.msg_id.into()), // Convert MessageID into MsgId
122            payload: stream_msg.payload.to_vec(),
123            publish_time: stream_msg.publish_time,
124            producer_name: stream_msg.producer_name,
125            subscription_name: stream_msg.subscription_name.unwrap_or_default(),
126            attributes: stream_msg.attributes,
127            schema_id: stream_msg.schema_id,
128            schema_version: stream_msg.schema_version,
129            routing_key: stream_msg.routing_key,
130        }
131    }
132}