Skip to main content

rocketmq_common/common/message/
message_single.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::fmt;
18use std::fmt::Display;
19use std::fmt::Formatter;
20use std::hash::DefaultHasher;
21use std::hash::Hash;
22use std::hash::Hasher;
23use std::net::SocketAddr;
24
25use bytes::Buf;
26use bytes::BufMut;
27use bytes::Bytes;
28use cheetah_string::CheetahString;
29
30use crate::common::hasher::string_hasher::JavaStringHasher;
31use crate::common::message::message_body::MessageBody;
32use crate::common::message::message_builder::MessageBuilder;
33use crate::common::message::message_ext::MessageExt;
34use crate::common::message::message_flag::MessageFlag;
35use crate::common::message::message_property::MessageProperties;
36use crate::common::message::message_property::MessagePropertyKey;
37use crate::common::message::MessageConst;
38use crate::common::message::MessageTrait;
39use crate::common::message::MessageVersion;
40use crate::common::sys_flag::message_sys_flag::MessageSysFlag;
41use crate::common::TopicFilterType;
42use crate::MessageUtils;
43
44#[derive(Clone, Debug)]
45pub struct Message {
46    topic: CheetahString,
47    flag: MessageFlag,
48    properties: MessageProperties,
49    body: MessageBody,
50    transaction_id: Option<CheetahString>,
51}
52
53// Private internal constructor used by builder
54impl Message {
55    pub(crate) fn from_builder(
56        topic: CheetahString,
57        body: MessageBody,
58        properties: MessageProperties,
59        flag: MessageFlag,
60        transaction_id: Option<CheetahString>,
61    ) -> Self {
62        Self {
63            topic,
64            flag,
65            properties,
66            body,
67            transaction_id,
68        }
69    }
70}
71
72impl Default for Message {
73    fn default() -> Self {
74        Self {
75            topic: CheetahString::new(),
76            flag: MessageFlag::empty(),
77            properties: MessageProperties::new(),
78            body: MessageBody::empty(),
79            transaction_id: None,
80        }
81    }
82}
83
84impl Message {
85    /// Creates a new message builder.
86    ///
87    /// This is the recommended way to create messages.
88    ///
89    /// # Examples
90    ///
91    /// ```
92    /// use rocketmq_common::common::message::message_single::Message;
93    ///
94    /// let msg = Message::builder()
95    ///     .topic("test-topic")
96    ///     .body_slice(b"hello world")
97    ///     .tags("important")
98    ///     .build_unchecked();
99    /// ```
100    pub fn builder() -> MessageBuilder {
101        MessageBuilder::new()
102    }
103
104    /// Create a new message with topic and body slice (will copy)
105    #[deprecated(since = "0.8.0", note = "Use Message::builder() instead")]
106    #[allow(deprecated)]
107    #[inline]
108    pub fn new(topic: impl Into<CheetahString>, body: &[u8]) -> Self {
109        Self::with_details(topic, CheetahString::empty(), CheetahString::empty(), 0, body, true)
110    }
111
112    /// Create a new message with topic and Bytes (zero-copy)
113    #[deprecated(since = "0.8.0", note = "Use Message::builder() instead")]
114    #[allow(deprecated)]
115    #[inline]
116    pub fn new_with_bytes(topic: impl Into<CheetahString>, body: Bytes) -> Self {
117        Self::with_details_bytes(topic, CheetahString::empty(), CheetahString::empty(), 0, body, true)
118    }
119
120    /// Create a new message with topic and Vec<u8> (zero-copy conversion to Bytes)
121    #[deprecated(since = "0.8.0", note = "Use Message::builder() instead")]
122    #[allow(deprecated)]
123    #[inline]
124    pub fn new_with_vec(topic: impl Into<CheetahString>, body: Vec<u8>) -> Self {
125        Self::with_details_bytes(
126            topic,
127            CheetahString::empty(),
128            CheetahString::empty(),
129            0,
130            Bytes::from(body),
131            true,
132        )
133    }
134
135    #[deprecated(since = "0.8.0", note = "Use Message::builder() instead")]
136    #[allow(deprecated)]
137    #[inline]
138    pub fn new_body(topic: impl Into<CheetahString>, body: Option<Bytes>) -> Self {
139        Self::with_details_body(topic, CheetahString::empty(), CheetahString::empty(), 0, body, true)
140    }
141
142    /// Create a message with tags and body slice (will copy)
143    #[deprecated(since = "0.8.0", note = "Use Message::builder() instead")]
144    #[allow(deprecated)]
145    #[inline]
146    pub fn with_tags(topic: impl Into<CheetahString>, tags: impl Into<CheetahString>, body: &[u8]) -> Self {
147        Self::with_details(topic, tags, CheetahString::empty(), 0, body, true)
148    }
149
150    /// Create a message with tags and Bytes (zero-copy)
151    #[deprecated(since = "0.8.0", note = "Use Message::builder() instead")]
152    #[allow(deprecated)]
153    #[inline]
154    pub fn with_tags_bytes(topic: impl Into<CheetahString>, tags: impl Into<CheetahString>, body: Bytes) -> Self {
155        Self::with_details_bytes(topic, tags, CheetahString::empty(), 0, body, true)
156    }
157
158    /// Create a message with keys and body slice (will copy)
159    #[deprecated(since = "0.8.0", note = "Use Message::builder() instead")]
160    #[allow(deprecated)]
161    #[inline]
162    pub fn with_keys(
163        topic: impl Into<CheetahString>,
164        tags: impl Into<CheetahString>,
165        keys: impl Into<CheetahString>,
166        body: &[u8],
167    ) -> Self {
168        Self::with_details(topic, tags, keys, 0, body, true)
169    }
170
171    /// Create a message with keys and Bytes (zero-copy)
172    #[deprecated(since = "0.8.0", note = "Use Message::builder() instead")]
173    #[allow(deprecated)]
174    #[inline]
175    pub fn with_keys_bytes(
176        topic: impl Into<CheetahString>,
177        tags: impl Into<CheetahString>,
178        keys: impl Into<CheetahString>,
179        body: Bytes,
180    ) -> Self {
181        Self::with_details_bytes(topic, tags, keys, 0, body, true)
182    }
183
184    /// Create message with body slice (will copy data)
185    #[deprecated(since = "0.8.0", note = "Use Message::builder() instead")]
186    #[allow(deprecated)]
187    pub fn with_details(
188        topic: impl Into<CheetahString>,
189        tags: impl Into<CheetahString>,
190        keys: impl Into<CheetahString>,
191        flag: i32,
192        body: &[u8],
193        wait_store_msg_ok: bool,
194    ) -> Self {
195        Self::with_details_bytes(topic, tags, keys, flag, Bytes::copy_from_slice(body), wait_store_msg_ok)
196    }
197
198    /// Create message with Bytes (zero-copy)
199    #[deprecated(since = "0.8.0", note = "Use Message::builder() instead")]
200    pub fn with_details_bytes(
201        topic: impl Into<CheetahString>,
202        tags: impl Into<CheetahString>,
203        keys: impl Into<CheetahString>,
204        flag: i32,
205        body: Bytes,
206        wait_store_msg_ok: bool,
207    ) -> Self {
208        let topic = topic.into();
209        let tags = tags.into();
210        let keys = keys.into();
211
212        // Pre-allocate HashMap with estimated capacity to avoid reallocation
213        let has_tags = !tags.is_empty();
214        let has_keys = !keys.is_empty();
215        let initial_capacity = (has_tags as usize) + (has_keys as usize) + 1;
216        let mut properties = HashMap::with_capacity(initial_capacity);
217
218        // Use static strings for keys to avoid allocations
219        if has_tags {
220            properties.insert(CheetahString::from_static_str(MessageConst::PROPERTY_TAGS), tags);
221        }
222
223        if has_keys {
224            properties.insert(CheetahString::from_static_str(MessageConst::PROPERTY_KEYS), keys);
225        }
226
227        if !wait_store_msg_ok {
228            properties.insert(
229                CheetahString::from_static_str(MessageConst::PROPERTY_WAIT_STORE_MSG_OK),
230                CheetahString::from_static_str("false"),
231            );
232        }
233
234        Message {
235            topic,
236            flag: MessageFlag::from_bits(flag),
237            properties: MessageProperties::from_map(properties),
238            body: MessageBody::from(body),
239            transaction_id: None,
240        }
241    }
242
243    pub fn with_details_body(
244        topic: impl Into<CheetahString>,
245        tags: impl Into<CheetahString>,
246        keys: impl Into<CheetahString>,
247        flag: i32,
248        body: Option<Bytes>,
249        wait_store_msg_ok: bool,
250    ) -> Self {
251        let topic = topic.into();
252        let tags = tags.into();
253        let keys = keys.into();
254
255        // Pre-allocate HashMap with estimated capacity
256        let has_tags = !tags.is_empty();
257        let has_keys = !keys.is_empty();
258        let initial_capacity = (has_tags as usize) + (has_keys as usize) + 1;
259        let mut properties = HashMap::with_capacity(initial_capacity);
260
261        if has_tags {
262            properties.insert(CheetahString::from_static_str(MessageConst::PROPERTY_TAGS), tags);
263        }
264
265        if has_keys {
266            properties.insert(CheetahString::from_static_str(MessageConst::PROPERTY_KEYS), keys);
267        }
268
269        if !wait_store_msg_ok {
270            properties.insert(
271                CheetahString::from_static_str(MessageConst::PROPERTY_WAIT_STORE_MSG_OK),
272                CheetahString::from_static_str("false"),
273            );
274        }
275
276        Message {
277            topic,
278            flag: MessageFlag::from_bits(flag),
279            properties: MessageProperties::from_map(properties),
280            body: if let Some(b) = body {
281                MessageBody::from(b)
282            } else {
283                MessageBody::empty()
284            },
285            transaction_id: None,
286        }
287    }
288
289    #[inline]
290    pub fn set_tags(&mut self, tags: CheetahString) {
291        self.properties
292            .as_map_mut()
293            .insert(CheetahString::from_static_str(MessageConst::PROPERTY_TAGS), tags);
294    }
295
296    #[inline]
297    pub fn set_keys(&mut self, keys: CheetahString) {
298        self.properties
299            .as_map_mut()
300            .insert(CheetahString::from_static_str(MessageConst::PROPERTY_KEYS), keys);
301    }
302
303    #[inline]
304    pub fn clear_property(&mut self, name: impl Into<CheetahString>) {
305        self.properties.as_map_mut().remove(name.into().as_str());
306    }
307
308    #[inline]
309    pub fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
310        self.properties = MessageProperties::from_map(properties);
311    }
312
313    #[inline]
314    pub fn get_property(&self, key: &CheetahString) -> Option<CheetahString> {
315        self.properties.as_map().get(key).cloned()
316    }
317
318    #[inline]
319    pub fn body(&self) -> Option<bytes::Bytes> {
320        self.body.raw().cloned()
321    }
322
323    #[inline]
324    pub fn flag(&self) -> i32 {
325        self.flag.bits()
326    }
327
328    #[inline]
329    pub fn topic(&self) -> &CheetahString {
330        &self.topic
331    }
332
333    #[inline]
334    pub fn properties(&self) -> &MessageProperties {
335        &self.properties
336    }
337
338    #[inline]
339    pub fn transaction_id(&self) -> Option<&str> {
340        self.transaction_id.as_deref()
341    }
342
343    #[inline]
344    pub fn get_transaction_id(&self) -> Option<&CheetahString> {
345        self.transaction_id.as_ref()
346    }
347
348    #[inline]
349    pub fn get_tags(&self) -> Option<CheetahString> {
350        self.properties.as_map().get(MessageConst::PROPERTY_TAGS).cloned()
351    }
352
353    #[inline]
354    pub fn is_wait_store_msg_ok(&self) -> bool {
355        self.properties.wait_store_msg_ok()
356    }
357
358    #[inline]
359    fn set_wait_store_msg_ok(&mut self, wait_store_msg_ok: bool) {
360        if !wait_store_msg_ok {
361            self.properties.as_map_mut().insert(
362                CheetahString::from_static_str(MessageConst::PROPERTY_WAIT_STORE_MSG_OK),
363                CheetahString::from_static_str("false"),
364            );
365        }
366    }
367
368    #[inline]
369    pub fn delay_time_level(&self) -> i32 {
370        self.properties.delay_level().unwrap_or(0)
371    }
372
373    #[inline]
374    pub fn set_delay_time_level(&mut self, level: i32) {
375        self.properties.as_map_mut().insert(
376            CheetahString::from_static_str(MessageConst::PROPERTY_DELAY_TIME_LEVEL),
377            CheetahString::from(level.to_string()),
378        );
379    }
380
381    #[inline]
382    pub fn get_user_property(&self, name: impl Into<CheetahString>) -> Option<CheetahString> {
383        self.properties.as_map().get(name.into().as_str()).cloned()
384    }
385
386    #[inline]
387    pub fn as_any(&self) -> &dyn Any {
388        self
389    }
390
391    #[inline]
392    pub fn set_instance_id(&mut self, instance_id: impl Into<CheetahString>) {
393        self.properties.as_map_mut().insert(
394            CheetahString::from_static_str(MessageConst::PROPERTY_INSTANCE_ID),
395            instance_id.into(),
396        );
397    }
398
399    // ===== New Rust-idiomatic API methods =====
400
401    /// Returns the message body as a byte slice (borrows).
402    ///
403    /// This is the recommended way to access the message body.
404    #[inline]
405    pub fn body_slice(&self) -> &[u8] {
406        self.body.as_slice()
407    }
408
409    /// Consumes the message and returns the body.
410    #[inline]
411    pub fn into_body(self) -> MessageBody {
412        self.body
413    }
414
415    /// Returns the message tags.
416    #[inline]
417    pub fn tags(&self) -> Option<&str> {
418        self.properties.tags()
419    }
420
421    /// Returns the message keys as a vector.
422    #[inline]
423    pub fn keys(&self) -> Option<Vec<String>> {
424        self.properties.keys()
425    }
426
427    /// Returns a property value by key.
428    #[inline]
429    pub fn property(&self, key: &str) -> Option<&str> {
430        self.properties.as_map().get(key).map(|s| s.as_str())
431    }
432
433    /// Returns the message flag as a type-safe MessageFlag.
434    #[inline]
435    pub fn message_flag(&self) -> MessageFlag {
436        self.flag
437    }
438
439    /// Returns whether to wait for store confirmation.
440    #[inline]
441    pub fn wait_store_msg_ok(&self) -> bool {
442        self.is_wait_store_msg_ok()
443    }
444
445    /// Returns the delay time level.
446    #[inline]
447    pub fn delay_level(&self) -> i32 {
448        self.delay_time_level()
449    }
450
451    /// Returns the buyer ID.
452    #[inline]
453    pub fn buyer_id(&self) -> Option<&str> {
454        self.properties.buyer_id()
455    }
456
457    /// Returns the instance ID.
458    #[inline]
459    pub fn instance_id(&self) -> Option<&str> {
460        self.properties.instance_id()
461    }
462
463    // ===== Internal accessors for other modules =====
464
465    /// Returns a mutable reference to the topic (internal use only).
466    #[doc(hidden)]
467    #[inline]
468    pub fn topic_mut(&mut self) -> &mut CheetahString {
469        &mut self.topic
470    }
471
472    /// Returns a mutable reference to the flag (internal use only).
473    #[doc(hidden)]
474    #[inline]
475    pub fn flag_mut(&mut self) -> &mut MessageFlag {
476        &mut self.flag
477    }
478
479    /// Returns a mutable reference to properties (internal use only).
480    #[doc(hidden)]
481    #[inline]
482    pub fn properties_mut(&mut self) -> &mut MessageProperties {
483        &mut self.properties
484    }
485
486    /// Returns a mutable reference to the body (internal use only).
487    #[doc(hidden)]
488    #[inline]
489    pub fn body_mut(&mut self) -> &mut MessageBody {
490        &mut self.body
491    }
492
493    /// Returns a mutable reference to the transaction ID (internal use only).
494    #[doc(hidden)]
495    #[inline]
496    pub fn transaction_id_mut(&mut self) -> &mut Option<CheetahString> {
497        &mut self.transaction_id
498    }
499
500    /// Sets the topic (internal use only).
501    #[doc(hidden)]
502    #[inline]
503    pub fn set_topic(&mut self, topic: CheetahString) {
504        self.topic = topic;
505    }
506
507    /// Sets the flag (internal use only).
508    #[doc(hidden)]
509    #[inline]
510    pub fn set_flag(&mut self, flag: i32) {
511        self.flag = MessageFlag::from_bits(flag);
512    }
513
514    /// Sets the body (internal use only).
515    #[doc(hidden)]
516    #[inline]
517    pub fn set_body(&mut self, body: Option<Bytes>) {
518        self.body = if let Some(b) = body {
519            MessageBody::from(b)
520        } else {
521            MessageBody::empty()
522        };
523    }
524
525    /// Takes ownership of the body, leaving empty (internal use only).
526    #[doc(hidden)]
527    #[inline]
528    pub fn take_body(&mut self) -> Option<Bytes> {
529        let old = std::mem::take(&mut self.body);
530        old.raw().cloned()
531    }
532
533    /// Returns a reference to the compressed body (internal use only).
534    #[doc(hidden)]
535    #[inline]
536    pub fn compressed_body(&self) -> Option<&Bytes> {
537        self.body.compressed()
538    }
539}
540
541impl Display for Message {
542    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
543        let properties_str = self
544            .properties
545            .as_map()
546            .iter()
547            .map(|(k, v)| format!("{k}: {v}"))
548            .collect::<Vec<_>>()
549            .join(", ");
550
551        let body_len = self.body.len();
552        let compressed = if self.body.is_compressed() {
553            "compressed"
554        } else {
555            "uncompressed"
556        };
557
558        let transaction_id_str = match &self.transaction_id {
559            Some(transaction_id) => format!("Some({transaction_id:?})"),
560            None => "None".to_string(),
561        };
562
563        write!(
564            f,
565            "Message {{ topic: {}, flag: {:?}, properties: {{ {} }}, body: {} bytes ({}), transaction_id: {} }}",
566            self.topic, self.flag, properties_str, body_len, compressed, transaction_id_str
567        )
568    }
569}
570
571#[allow(unused_variables)]
572impl MessageTrait for Message {
573    #[inline]
574    fn put_property(&mut self, key: CheetahString, value: CheetahString) {
575        self.properties.as_map_mut().insert(key, value);
576    }
577
578    #[inline]
579    fn clear_property(&mut self, name: &str) {
580        self.properties.as_map_mut().remove(name);
581    }
582
583    #[inline]
584    fn property(&self, name: &CheetahString) -> Option<CheetahString> {
585        self.properties.as_map().get(name).cloned()
586    }
587
588    fn property_ref(&self, name: &CheetahString) -> Option<&CheetahString> {
589        self.properties.as_map().get(name)
590    }
591
592    #[inline]
593    fn topic(&self) -> &CheetahString {
594        &self.topic
595    }
596
597    #[inline]
598    fn set_topic(&mut self, topic: CheetahString) {
599        self.set_topic(topic);
600    }
601
602    #[inline]
603    fn get_flag(&self) -> i32 {
604        self.flag.bits()
605    }
606
607    #[inline]
608    fn set_flag(&mut self, flag: i32) {
609        self.set_flag(flag);
610    }
611
612    #[inline]
613    fn get_body(&self) -> Option<&Bytes> {
614        self.body.raw()
615    }
616
617    #[inline]
618    fn set_body(&mut self, body: Bytes) {
619        self.set_body(Some(body));
620    }
621
622    #[inline]
623    fn get_properties(&self) -> &HashMap<CheetahString, CheetahString> {
624        self.properties.as_map()
625    }
626
627    #[inline]
628    fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
629        self.properties = MessageProperties::from_map(properties);
630    }
631
632    #[inline]
633    fn transaction_id(&self) -> Option<&CheetahString> {
634        self.transaction_id.as_ref()
635    }
636
637    #[inline]
638    fn set_transaction_id(&mut self, transaction_id: CheetahString) {
639        *self.transaction_id_mut() = Some(transaction_id);
640    }
641
642    #[inline]
643    fn get_compressed_body_mut(&mut self) -> Option<&mut Bytes> {
644        self.body.compressed_mut().as_mut()
645    }
646
647    #[inline]
648    fn get_compressed_body(&self) -> Option<&Bytes> {
649        self.compressed_body()
650    }
651
652    #[inline]
653    fn set_compressed_body_mut(&mut self, compressed_body: Bytes) {
654        self.body.set_compressed(compressed_body);
655    }
656
657    #[inline]
658    fn take_body(&mut self) -> Option<Bytes> {
659        self.take_body()
660    }
661
662    #[inline]
663    fn as_any(&self) -> &dyn Any {
664        self
665    }
666
667    #[inline]
668    fn as_any_mut(&mut self) -> &mut dyn Any {
669        self
670    }
671}
672
673pub fn parse_topic_filter_type(sys_flag: i32) -> TopicFilterType {
674    if (sys_flag & MessageSysFlag::MULTI_TAGS_FLAG) == MessageSysFlag::MULTI_TAGS_FLAG {
675        TopicFilterType::MultiTag
676    } else {
677        TopicFilterType::SingleTag
678    }
679}
680
681pub fn tags_string2tags_code(tags: Option<&CheetahString>) -> i64 {
682    if tags.is_none() {
683        return 0;
684    }
685    let tags = tags.unwrap();
686    if tags.is_empty() {
687        return 0;
688    }
689    JavaStringHasher::hash_str(tags.as_str()) as i64
690}
691
692#[cfg(test)]
693#[allow(deprecated)]
694mod tests {
695    use bytes::Bytes;
696
697    use super::*;
698
699    #[test]
700    fn test_message_new() {
701        let msg = Message::new("test_topic", b"test_body");
702        assert_eq!(msg.topic().as_str(), "test_topic");
703        assert_eq!(msg.body().unwrap().as_ref(), b"test_body");
704    }
705
706    #[test]
707    fn test_message_new_with_bytes() {
708        let body = Bytes::from_static(b"test_body");
709        let msg = Message::new_with_bytes("test_topic", body.clone());
710        assert_eq!(msg.topic().as_str(), "test_topic");
711        assert_eq!(msg.body().unwrap(), body);
712    }
713
714    #[test]
715    fn test_message_new_with_vec() {
716        let body = vec![1u8, 2, 3, 4, 5];
717        let msg = Message::new_with_vec("test_topic", body.clone());
718        assert_eq!(msg.topic().as_str(), "test_topic");
719        assert_eq!(msg.body().unwrap().as_ref(), body.as_slice());
720    }
721
722    #[test]
723    fn test_message_with_tags() {
724        let msg = Message::with_tags("test_topic", "tag1", b"test_body");
725        assert_eq!(msg.topic().as_str(), "test_topic");
726        assert_eq!(msg.get_tags().unwrap().as_str(), "tag1");
727    }
728
729    #[test]
730    fn test_message_with_tags_bytes() {
731        let body = Bytes::from_static(b"test_body");
732        let msg = Message::with_tags_bytes("test_topic", "tag1", body.clone());
733        assert_eq!(msg.topic().as_str(), "test_topic");
734        assert_eq!(msg.get_tags().unwrap().as_str(), "tag1");
735        assert_eq!(msg.body().unwrap(), body);
736    }
737
738    #[test]
739    fn test_message_with_keys() {
740        let msg = Message::with_keys("test_topic", "tag1", "key1", b"test_body");
741        assert_eq!(msg.topic().as_str(), "test_topic");
742        assert_eq!(msg.get_tags().unwrap().as_str(), "tag1");
743        assert_eq!(
744            msg.properties()
745                .as_map()
746                .get(MessageConst::PROPERTY_KEYS)
747                .unwrap()
748                .as_str(),
749            "key1"
750        );
751    }
752
753    #[test]
754    fn test_message_with_keys_bytes() {
755        let body = Bytes::from_static(b"test_body");
756        let msg = Message::with_keys_bytes("test_topic", "tag1", "key1", body.clone());
757        assert_eq!(msg.topic().as_str(), "test_topic");
758        assert_eq!(msg.get_tags().unwrap().as_str(), "tag1");
759        assert_eq!(msg.body().unwrap(), body);
760    }
761
762    #[test]
763    fn test_message_with_details() {
764        let msg = Message::with_details("test_topic", "tag1", "key1", 0, b"test_body", true);
765        assert_eq!(msg.topic().as_str(), "test_topic");
766        assert_eq!(msg.get_tags().unwrap().as_str(), "tag1");
767        assert!(msg.is_wait_store_msg_ok());
768    }
769
770    #[test]
771    fn test_message_with_details_bytes() {
772        let body = Bytes::from_static(b"test_body");
773        let msg = Message::with_details_bytes("test_topic", "tag1", "key1", 0, body.clone(), false);
774        assert_eq!(msg.topic().as_str(), "test_topic");
775        assert!(!msg.is_wait_store_msg_ok());
776        assert_eq!(msg.body().unwrap(), body);
777    }
778
779    #[test]
780    fn test_get_transaction_id_returns_cheetah_string_ref() {
781        let mut msg = Message::new("test_topic", b"test_body");
782        msg.set_transaction_id(CheetahString::from_static_str("tx-123"));
783
784        let transaction_id = msg.get_transaction_id();
785
786        assert_eq!(transaction_id, Some(&CheetahString::from_static_str("tx-123")));
787    }
788
789    #[test]
790    fn test_properties_capacity_optimization() {
791        // Test with no tags or keys
792        let msg1 = Message::with_details_bytes(
793            "topic",
794            CheetahString::empty(),
795            CheetahString::empty(),
796            0,
797            Bytes::from_static(b"body"),
798            true,
799        );
800        assert_eq!(msg1.properties().len(), 0);
801
802        // Test with tags
803        let msg2 = Message::with_details_bytes(
804            "topic",
805            "tag1",
806            CheetahString::empty(),
807            0,
808            Bytes::from_static(b"body"),
809            true,
810        );
811        assert_eq!(msg2.properties().len(), 1);
812
813        // Test with tags and keys
814        let msg3 = Message::with_details_bytes("topic", "tag1", "key1", 0, Bytes::from_static(b"body"), true);
815        assert_eq!(msg3.properties().len(), 2);
816
817        // Test with wait_store_msg_ok = false
818        let msg4 = Message::with_details_bytes("topic", "tag1", "key1", 0, Bytes::from_static(b"body"), false);
819        assert_eq!(msg4.properties().len(), 3);
820    }
821
822    #[test]
823    fn test_zero_copy_bytes() {
824        let original_bytes = Bytes::from_static(b"test_data");
825        let bytes_clone = original_bytes.clone();
826
827        // Creating message with Bytes should not copy
828        let msg = Message::new_with_bytes("topic", bytes_clone);
829
830        // The body should share the same underlying data
831        let body = msg.body().unwrap();
832        assert_eq!(body.as_ptr(), original_bytes.as_ptr());
833    }
834
835    #[test]
836    fn test_put_user_property_error_handling() {
837        use rocketmq_error::RocketMQError;
838
839        let mut msg = Message::new("test_topic", b"test body");
840
841        // Test empty name
842        let result = msg.put_user_property(CheetahString::empty(), CheetahString::from_slice("value"));
843        assert!(result.is_err());
844        if let Err(RocketMQError::InvalidProperty(e)) = result {
845            assert!(e.contains("null or blank"));
846        }
847
848        // Test empty value
849        let result = msg.put_user_property(CheetahString::from_slice("name"), CheetahString::empty());
850        assert!(result.is_err());
851        if let Err(RocketMQError::InvalidProperty(e)) = result {
852            assert!(e.contains("null or blank"));
853        }
854
855        // Test system reserved property
856        let result = msg.put_user_property(CheetahString::from_slice("KEYS"), CheetahString::from_slice("value"));
857        assert!(result.is_err());
858        if let Err(RocketMQError::InvalidProperty(e)) = result {
859            assert!(e.contains("used by system"));
860        }
861
862        // Test valid user property
863        let result = msg.put_user_property(
864            CheetahString::from_slice("my_custom_key"),
865            CheetahString::from_slice("my_value"),
866        );
867        assert!(result.is_ok());
868        assert_eq!(
869            msg.get_user_property(CheetahString::from_slice("my_custom_key"))
870                .unwrap()
871                .as_str(),
872            "my_value"
873        );
874    }
875}