Skip to main content

rocketmq_common/common/message/
broker_message.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::fmt::Debug;
17use std::fmt::Display;
18
19use bytes::BytesMut;
20use cheetah_string::CheetahString;
21
22use crate::common::hasher::string_hasher::JavaStringHasher;
23use crate::common::message::message_envelope::MessageEnvelope;
24use crate::common::message::MessageTrait;
25use crate::common::message::MessageVersion;
26use crate::common::mix_all;
27use crate::MessageUtils;
28
29/// Broker internal message representation
30///
31/// Contains extension information needed by the Broker when processing messages, such as
32/// tags_code, encoding cache, etc. Only used internally by the Broker, not exposed to clients.
33///
34/// # Use Cases
35///
36/// - Broker receives messages and constructs BrokerMessage for writing to CommitLog
37/// - Store module internal message passing
38/// - Message filtering and index building
39pub struct BrokerMessage {
40    /// Message envelope (complete message + metadata)
41    envelope: MessageEnvelope,
42
43    /// Tags hash code (used for message filtering)
44    tags_code: i64,
45
46    /// Serialized properties string (performance optimization to avoid repeated serialization)
47    properties_string: CheetahString,
48
49    /// Message version
50    version: MessageVersion,
51
52    /// Encoding cache (avoids repeated serialization)
53    encoded_buff: Option<BytesMut>,
54
55    /// Whether encoding is completed
56    encode_completed: bool,
57}
58
59impl BrokerMessage {
60    /// Creates a new broker message
61    pub fn new(
62        envelope: MessageEnvelope,
63        tags_code: i64,
64        properties_string: CheetahString,
65        version: MessageVersion,
66    ) -> Self {
67        Self {
68            envelope,
69            tags_code,
70            properties_string,
71            version,
72            encoded_buff: None,
73            encode_completed: false,
74        }
75    }
76
77    /// Creates from message envelope (automatically calculates tags_code and properties_string)
78    pub fn from_envelope(envelope: MessageEnvelope) -> Self {
79        let tags_code = envelope
80            .tags()
81            .map(|tags| Self::calculate_tags_code(&tags))
82            .unwrap_or(0);
83
84        let properties_string = Self::serialize_properties(envelope.properties());
85
86        Self {
87            envelope,
88            tags_code,
89            properties_string,
90            version: MessageVersion::V1,
91            encoded_buff: None,
92            encode_completed: false,
93        }
94    }
95
96    // ===== Envelope Access =====
97
98    /// Gets the message envelope
99    #[inline]
100    pub fn envelope(&self) -> &MessageEnvelope {
101        &self.envelope
102    }
103
104    /// Gets the mutable message envelope
105    #[inline]
106    pub fn envelope_mut(&mut self) -> &mut MessageEnvelope {
107        &mut self.envelope
108    }
109
110    /// Consumes self and returns the envelope
111    #[inline]
112    pub fn into_envelope(self) -> MessageEnvelope {
113        self.envelope
114    }
115
116    // ===== Broker-Specific Field Access =====
117
118    /// Gets the tags hash code
119    #[inline]
120    pub fn tags_code(&self) -> i64 {
121        self.tags_code
122    }
123
124    /// Gets the serialized properties string
125    #[inline]
126    pub fn properties_string(&self) -> &str {
127        &self.properties_string
128    }
129
130    /// Gets the message version
131    #[inline]
132    pub fn version(&self) -> MessageVersion {
133        self.version
134    }
135
136    /// Gets the encoding cache
137    #[inline]
138    pub fn encoded_buff(&self) -> Option<&BytesMut> {
139        self.encoded_buff.as_ref()
140    }
141
142    /// Gets the mutable encoding cache
143    #[inline]
144    pub fn encoded_buff_mut(&mut self) -> Option<&mut BytesMut> {
145        self.encoded_buff.as_mut()
146    }
147
148    /// Checks if encoding is completed
149    #[inline]
150    pub fn is_encode_completed(&self) -> bool {
151        self.encode_completed
152    }
153
154    // ===== Modification Methods =====
155
156    /// Sets the encoding cache
157    pub fn set_encoded_buff(&mut self, buff: BytesMut) {
158        self.encoded_buff = Some(buff);
159        self.encode_completed = true;
160    }
161
162    /// Takes the encoding cache
163    pub fn take_encoded_buff(&mut self) -> Option<BytesMut> {
164        self.encode_completed = false;
165        self.encoded_buff.take()
166    }
167
168    /// Clears the encoding cache
169    pub fn clear_encoded_buff(&mut self) {
170        self.encoded_buff = None;
171        self.encode_completed = false;
172    }
173
174    /// Sets the message version
175    pub fn set_version(&mut self, version: MessageVersion) {
176        self.version = version;
177    }
178
179    /// Sets the tags code
180    pub fn set_tags_code(&mut self, tags_code: i64) {
181        self.tags_code = tags_code;
182    }
183
184    /// Sets the properties string
185    pub fn set_properties_string(&mut self, properties_string: CheetahString) {
186        self.properties_string = properties_string;
187    }
188
189    // ===== Utility Methods =====
190
191    /// Calculates the hash code for tags
192    pub fn calculate_tags_code(tags: &str) -> i64 {
193        if tags.is_empty() {
194            return 0;
195        }
196        JavaStringHasher::hash_str(tags) as i64
197    }
198
199    /// Serializes properties to string
200    fn serialize_properties(properties: &std::collections::HashMap<CheetahString, CheetahString>) -> CheetahString {
201        mix_all::properties_to_string(properties)
202    }
203
204    /// Deletes a property (synchronizes properties_string)
205    pub fn delete_property(&mut self, name: &str) {
206        self.envelope.message_mut().clear_property(name);
207
208        self.properties_string =
209            CheetahString::from_string(MessageUtils::delete_property(&self.properties_string, name));
210    }
211
212    /// Gets a property value
213    pub fn property(&self, name: &str) -> Option<CheetahString> {
214        self.envelope
215            .properties()
216            .get(&CheetahString::from_string(name.to_string()))
217            .cloned()
218    }
219}
220
221impl Debug for BrokerMessage {
222    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223        f.debug_struct("BrokerMessage")
224            .field("envelope", &self.envelope)
225            .field("tags_code", &self.tags_code)
226            .field("properties_string", &self.properties_string)
227            .field("version", &self.version)
228            .field("encoded_buff", &self.encoded_buff.as_ref().map(|_| "****"))
229            .field("encode_completed", &self.encode_completed)
230            .finish()
231    }
232}
233
234impl Display for BrokerMessage {
235    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236        write!(
237            f,
238            "BrokerMessage {{ envelope: {}, tags_code: {}, version: {} }}",
239            self.envelope, self.tags_code, self.version
240        )
241    }
242}
243
244impl Default for BrokerMessage {
245    fn default() -> Self {
246        Self {
247            envelope: MessageEnvelope::default(),
248            tags_code: 0,
249            properties_string: CheetahString::new(),
250            version: MessageVersion::V1,
251            encoded_buff: None,
252            encode_completed: false,
253        }
254    }
255}
256
257// Delegate common trait implementations to envelope
258impl MessageTrait for BrokerMessage {
259    fn put_property(&mut self, key: CheetahString, value: CheetahString) {
260        self.envelope.message_mut().put_property(key, value);
261        // Update properties_string
262        self.properties_string = Self::serialize_properties(self.envelope.properties());
263    }
264
265    fn clear_property(&mut self, name: &str) {
266        self.delete_property(name);
267    }
268
269    fn property(&self, name: &CheetahString) -> Option<CheetahString> {
270        self.envelope.properties().get(name).cloned()
271    }
272
273    fn property_ref(&self, name: &CheetahString) -> Option<&CheetahString> {
274        self.envelope.properties().get(name)
275    }
276
277    fn topic(&self) -> &CheetahString {
278        self.envelope.topic()
279    }
280
281    fn set_topic(&mut self, topic: CheetahString) {
282        self.envelope.message_mut().set_topic(topic);
283    }
284
285    fn get_flag(&self) -> i32 {
286        self.envelope.flag()
287    }
288
289    fn set_flag(&mut self, flag: i32) {
290        self.envelope.message_mut().set_flag(flag);
291    }
292
293    fn get_body(&self) -> Option<&bytes::Bytes> {
294        self.envelope.message().get_body()
295    }
296
297    fn set_body(&mut self, body: bytes::Bytes) {
298        self.envelope.message_mut().set_body(Some(body));
299    }
300
301    fn get_properties(&self) -> &std::collections::HashMap<CheetahString, CheetahString> {
302        self.envelope.properties()
303    }
304
305    fn set_properties(&mut self, properties: std::collections::HashMap<CheetahString, CheetahString>) {
306        self.envelope.message_mut().set_properties(properties.clone());
307        self.properties_string = Self::serialize_properties(&properties);
308    }
309
310    fn transaction_id(&self) -> Option<&CheetahString> {
311        MessageTrait::transaction_id(self.envelope.message())
312    }
313
314    fn set_transaction_id(&mut self, transaction_id: CheetahString) {
315        self.envelope.message_mut().set_transaction_id(transaction_id);
316    }
317
318    fn get_compressed_body_mut(&mut self) -> Option<&mut bytes::Bytes> {
319        self.envelope.message_mut().get_compressed_body_mut()
320    }
321
322    fn get_compressed_body(&self) -> Option<&bytes::Bytes> {
323        self.envelope.message().get_compressed_body()
324    }
325
326    fn set_compressed_body_mut(&mut self, compressed_body: bytes::Bytes) {
327        self.envelope.message_mut().set_compressed_body_mut(compressed_body);
328    }
329
330    fn take_body(&mut self) -> Option<bytes::Bytes> {
331        self.envelope.message_mut().take_body()
332    }
333
334    fn as_any(&self) -> &dyn std::any::Any {
335        self
336    }
337
338    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
339        self
340    }
341}
342
343#[cfg(test)]
344mod tests {
345    use super::*;
346
347    #[test]
348    fn test_broker_message_creation() {
349        let envelope = MessageEnvelope::default();
350        let broker_msg = BrokerMessage::new(
351            envelope,
352            123,
353            CheetahString::from_static_str("key=value"),
354            MessageVersion::V1,
355        );
356
357        assert_eq!(broker_msg.tags_code(), 123);
358        assert_eq!(broker_msg.properties_string(), "key=value");
359        assert_eq!(broker_msg.version(), MessageVersion::V1);
360        assert!(!broker_msg.is_encode_completed());
361    }
362
363    #[test]
364    fn test_from_envelope() {
365        let envelope = MessageEnvelope::default();
366        let broker_msg = BrokerMessage::from_envelope(envelope);
367
368        assert_eq!(broker_msg.tags_code(), 0);
369        assert!(!broker_msg.is_encode_completed());
370    }
371
372    #[test]
373    fn test_encoded_buff() {
374        let mut broker_msg = BrokerMessage::default();
375        assert!(broker_msg.encoded_buff().is_none());
376
377        let buff = BytesMut::from(&b"test data"[..]);
378        broker_msg.set_encoded_buff(buff);
379
380        assert!(broker_msg.encoded_buff().is_some());
381        assert!(broker_msg.is_encode_completed());
382
383        broker_msg.clear_encoded_buff();
384        assert!(broker_msg.encoded_buff().is_none());
385        assert!(!broker_msg.is_encode_completed());
386    }
387
388    #[test]
389    fn test_calculate_tags_code() {
390        let code = BrokerMessage::calculate_tags_code("TagA");
391        assert_ne!(code, 0);
392
393        let empty_code = BrokerMessage::calculate_tags_code("");
394        assert_eq!(empty_code, 0);
395    }
396
397    #[test]
398    fn test_display() {
399        let broker_msg = BrokerMessage::default();
400        let display_str = format!("{}", broker_msg);
401        assert!(display_str.contains("BrokerMessage"));
402    }
403}