rocketmq_common/common/message/
broker_message.rs1use std::fmt;
16use std::fmt::Debug;
17use std::fmt::Display;
18
19use bytes::BytesMut;
20use cheetah_string::CheetahString;
21
22use crate::common::hasher::string_hasher::JavaStringHasher;
23use crate::common::message::message_envelope::MessageEnvelope;
24use crate::common::message::MessageTrait;
25use crate::common::message::MessageVersion;
26use crate::common::mix_all;
27use crate::MessageUtils;
28
29pub struct BrokerMessage {
40 envelope: MessageEnvelope,
42
43 tags_code: i64,
45
46 properties_string: CheetahString,
48
49 version: MessageVersion,
51
52 encoded_buff: Option<BytesMut>,
54
55 encode_completed: bool,
57}
58
59impl BrokerMessage {
60 pub fn new(
62 envelope: MessageEnvelope,
63 tags_code: i64,
64 properties_string: CheetahString,
65 version: MessageVersion,
66 ) -> Self {
67 Self {
68 envelope,
69 tags_code,
70 properties_string,
71 version,
72 encoded_buff: None,
73 encode_completed: false,
74 }
75 }
76
77 pub fn from_envelope(envelope: MessageEnvelope) -> Self {
79 let tags_code = envelope
80 .tags()
81 .map(|tags| Self::calculate_tags_code(&tags))
82 .unwrap_or(0);
83
84 let properties_string = Self::serialize_properties(envelope.properties());
85
86 Self {
87 envelope,
88 tags_code,
89 properties_string,
90 version: MessageVersion::V1,
91 encoded_buff: None,
92 encode_completed: false,
93 }
94 }
95
96 #[inline]
100 pub fn envelope(&self) -> &MessageEnvelope {
101 &self.envelope
102 }
103
104 #[inline]
106 pub fn envelope_mut(&mut self) -> &mut MessageEnvelope {
107 &mut self.envelope
108 }
109
110 #[inline]
112 pub fn into_envelope(self) -> MessageEnvelope {
113 self.envelope
114 }
115
116 #[inline]
120 pub fn tags_code(&self) -> i64 {
121 self.tags_code
122 }
123
124 #[inline]
126 pub fn properties_string(&self) -> &str {
127 &self.properties_string
128 }
129
130 #[inline]
132 pub fn version(&self) -> MessageVersion {
133 self.version
134 }
135
136 #[inline]
138 pub fn encoded_buff(&self) -> Option<&BytesMut> {
139 self.encoded_buff.as_ref()
140 }
141
142 #[inline]
144 pub fn encoded_buff_mut(&mut self) -> Option<&mut BytesMut> {
145 self.encoded_buff.as_mut()
146 }
147
148 #[inline]
150 pub fn is_encode_completed(&self) -> bool {
151 self.encode_completed
152 }
153
154 pub fn set_encoded_buff(&mut self, buff: BytesMut) {
158 self.encoded_buff = Some(buff);
159 self.encode_completed = true;
160 }
161
162 pub fn take_encoded_buff(&mut self) -> Option<BytesMut> {
164 self.encode_completed = false;
165 self.encoded_buff.take()
166 }
167
168 pub fn clear_encoded_buff(&mut self) {
170 self.encoded_buff = None;
171 self.encode_completed = false;
172 }
173
174 pub fn set_version(&mut self, version: MessageVersion) {
176 self.version = version;
177 }
178
179 pub fn set_tags_code(&mut self, tags_code: i64) {
181 self.tags_code = tags_code;
182 }
183
184 pub fn set_properties_string(&mut self, properties_string: CheetahString) {
186 self.properties_string = properties_string;
187 }
188
189 pub fn calculate_tags_code(tags: &str) -> i64 {
193 if tags.is_empty() {
194 return 0;
195 }
196 JavaStringHasher::hash_str(tags) as i64
197 }
198
199 fn serialize_properties(properties: &std::collections::HashMap<CheetahString, CheetahString>) -> CheetahString {
201 mix_all::properties_to_string(properties)
202 }
203
204 pub fn delete_property(&mut self, name: &str) {
206 self.envelope.message_mut().clear_property(name);
207
208 self.properties_string =
209 CheetahString::from_string(MessageUtils::delete_property(&self.properties_string, name));
210 }
211
212 pub fn property(&self, name: &str) -> Option<CheetahString> {
214 self.envelope
215 .properties()
216 .get(&CheetahString::from_string(name.to_string()))
217 .cloned()
218 }
219}
220
221impl Debug for BrokerMessage {
222 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223 f.debug_struct("BrokerMessage")
224 .field("envelope", &self.envelope)
225 .field("tags_code", &self.tags_code)
226 .field("properties_string", &self.properties_string)
227 .field("version", &self.version)
228 .field("encoded_buff", &self.encoded_buff.as_ref().map(|_| "****"))
229 .field("encode_completed", &self.encode_completed)
230 .finish()
231 }
232}
233
234impl Display for BrokerMessage {
235 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236 write!(
237 f,
238 "BrokerMessage {{ envelope: {}, tags_code: {}, version: {} }}",
239 self.envelope, self.tags_code, self.version
240 )
241 }
242}
243
244impl Default for BrokerMessage {
245 fn default() -> Self {
246 Self {
247 envelope: MessageEnvelope::default(),
248 tags_code: 0,
249 properties_string: CheetahString::new(),
250 version: MessageVersion::V1,
251 encoded_buff: None,
252 encode_completed: false,
253 }
254 }
255}
256
257impl MessageTrait for BrokerMessage {
259 fn put_property(&mut self, key: CheetahString, value: CheetahString) {
260 self.envelope.message_mut().put_property(key, value);
261 self.properties_string = Self::serialize_properties(self.envelope.properties());
263 }
264
265 fn clear_property(&mut self, name: &str) {
266 self.delete_property(name);
267 }
268
269 fn property(&self, name: &CheetahString) -> Option<CheetahString> {
270 self.envelope.properties().get(name).cloned()
271 }
272
273 fn property_ref(&self, name: &CheetahString) -> Option<&CheetahString> {
274 self.envelope.properties().get(name)
275 }
276
277 fn topic(&self) -> &CheetahString {
278 self.envelope.topic()
279 }
280
281 fn set_topic(&mut self, topic: CheetahString) {
282 self.envelope.message_mut().set_topic(topic);
283 }
284
285 fn get_flag(&self) -> i32 {
286 self.envelope.flag()
287 }
288
289 fn set_flag(&mut self, flag: i32) {
290 self.envelope.message_mut().set_flag(flag);
291 }
292
293 fn get_body(&self) -> Option<&bytes::Bytes> {
294 self.envelope.message().get_body()
295 }
296
297 fn set_body(&mut self, body: bytes::Bytes) {
298 self.envelope.message_mut().set_body(Some(body));
299 }
300
301 fn get_properties(&self) -> &std::collections::HashMap<CheetahString, CheetahString> {
302 self.envelope.properties()
303 }
304
305 fn set_properties(&mut self, properties: std::collections::HashMap<CheetahString, CheetahString>) {
306 self.envelope.message_mut().set_properties(properties.clone());
307 self.properties_string = Self::serialize_properties(&properties);
308 }
309
310 fn transaction_id(&self) -> Option<&CheetahString> {
311 MessageTrait::transaction_id(self.envelope.message())
312 }
313
314 fn set_transaction_id(&mut self, transaction_id: CheetahString) {
315 self.envelope.message_mut().set_transaction_id(transaction_id);
316 }
317
318 fn get_compressed_body_mut(&mut self) -> Option<&mut bytes::Bytes> {
319 self.envelope.message_mut().get_compressed_body_mut()
320 }
321
322 fn get_compressed_body(&self) -> Option<&bytes::Bytes> {
323 self.envelope.message().get_compressed_body()
324 }
325
326 fn set_compressed_body_mut(&mut self, compressed_body: bytes::Bytes) {
327 self.envelope.message_mut().set_compressed_body_mut(compressed_body);
328 }
329
330 fn take_body(&mut self) -> Option<bytes::Bytes> {
331 self.envelope.message_mut().take_body()
332 }
333
334 fn as_any(&self) -> &dyn std::any::Any {
335 self
336 }
337
338 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
339 self
340 }
341}
342
343#[cfg(test)]
344mod tests {
345 use super::*;
346
347 #[test]
348 fn test_broker_message_creation() {
349 let envelope = MessageEnvelope::default();
350 let broker_msg = BrokerMessage::new(
351 envelope,
352 123,
353 CheetahString::from_static_str("key=value"),
354 MessageVersion::V1,
355 );
356
357 assert_eq!(broker_msg.tags_code(), 123);
358 assert_eq!(broker_msg.properties_string(), "key=value");
359 assert_eq!(broker_msg.version(), MessageVersion::V1);
360 assert!(!broker_msg.is_encode_completed());
361 }
362
363 #[test]
364 fn test_from_envelope() {
365 let envelope = MessageEnvelope::default();
366 let broker_msg = BrokerMessage::from_envelope(envelope);
367
368 assert_eq!(broker_msg.tags_code(), 0);
369 assert!(!broker_msg.is_encode_completed());
370 }
371
372 #[test]
373 fn test_encoded_buff() {
374 let mut broker_msg = BrokerMessage::default();
375 assert!(broker_msg.encoded_buff().is_none());
376
377 let buff = BytesMut::from(&b"test data"[..]);
378 broker_msg.set_encoded_buff(buff);
379
380 assert!(broker_msg.encoded_buff().is_some());
381 assert!(broker_msg.is_encode_completed());
382
383 broker_msg.clear_encoded_buff();
384 assert!(broker_msg.encoded_buff().is_none());
385 assert!(!broker_msg.is_encode_completed());
386 }
387
388 #[test]
389 fn test_calculate_tags_code() {
390 let code = BrokerMessage::calculate_tags_code("TagA");
391 assert_ne!(code, 0);
392
393 let empty_code = BrokerMessage::calculate_tags_code("");
394 assert_eq!(empty_code, 0);
395 }
396
397 #[test]
398 fn test_display() {
399 let broker_msg = BrokerMessage::default();
400 let display_str = format!("{}", broker_msg);
401 assert!(display_str.contains("BrokerMessage"));
402 }
403}