1use std::any::Any;
16use std::collections::HashMap;
17use std::fmt;
18use std::fmt::Display;
19use std::fmt::Formatter;
20use std::hash::DefaultHasher;
21use std::hash::Hash;
22use std::hash::Hasher;
23use std::net::SocketAddr;
24
25use bytes::Buf;
26use bytes::BufMut;
27use bytes::Bytes;
28use cheetah_string::CheetahString;
29
30use crate::common::hasher::string_hasher::JavaStringHasher;
31use crate::common::message::message_body::MessageBody;
32use crate::common::message::message_builder::MessageBuilder;
33use crate::common::message::message_ext::MessageExt;
34use crate::common::message::message_flag::MessageFlag;
35use crate::common::message::message_property::MessageProperties;
36use crate::common::message::message_property::MessagePropertyKey;
37use crate::common::message::MessageConst;
38use crate::common::message::MessageTrait;
39use crate::common::message::MessageVersion;
40use crate::common::sys_flag::message_sys_flag::MessageSysFlag;
41use crate::common::TopicFilterType;
42use crate::MessageUtils;
43
44#[derive(Clone, Debug)]
45pub struct Message {
46 topic: CheetahString,
47 flag: MessageFlag,
48 properties: MessageProperties,
49 body: MessageBody,
50 transaction_id: Option<CheetahString>,
51}
52
53impl Message {
55 pub(crate) fn from_builder(
56 topic: CheetahString,
57 body: MessageBody,
58 properties: MessageProperties,
59 flag: MessageFlag,
60 transaction_id: Option<CheetahString>,
61 ) -> Self {
62 Self {
63 topic,
64 flag,
65 properties,
66 body,
67 transaction_id,
68 }
69 }
70}
71
72impl Default for Message {
73 fn default() -> Self {
74 Self {
75 topic: CheetahString::new(),
76 flag: MessageFlag::empty(),
77 properties: MessageProperties::new(),
78 body: MessageBody::empty(),
79 transaction_id: None,
80 }
81 }
82}
83
84impl Message {
85 pub fn builder() -> MessageBuilder {
101 MessageBuilder::new()
102 }
103
104 #[deprecated(since = "0.8.0", note = "Use Message::builder() instead")]
106 #[allow(deprecated)]
107 #[inline]
108 pub fn new(topic: impl Into<CheetahString>, body: &[u8]) -> Self {
109 Self::with_details(topic, CheetahString::empty(), CheetahString::empty(), 0, body, true)
110 }
111
112 #[deprecated(since = "0.8.0", note = "Use Message::builder() instead")]
114 #[allow(deprecated)]
115 #[inline]
116 pub fn new_with_bytes(topic: impl Into<CheetahString>, body: Bytes) -> Self {
117 Self::with_details_bytes(topic, CheetahString::empty(), CheetahString::empty(), 0, body, true)
118 }
119
120 #[deprecated(since = "0.8.0", note = "Use Message::builder() instead")]
122 #[allow(deprecated)]
123 #[inline]
124 pub fn new_with_vec(topic: impl Into<CheetahString>, body: Vec<u8>) -> Self {
125 Self::with_details_bytes(
126 topic,
127 CheetahString::empty(),
128 CheetahString::empty(),
129 0,
130 Bytes::from(body),
131 true,
132 )
133 }
134
135 #[deprecated(since = "0.8.0", note = "Use Message::builder() instead")]
136 #[allow(deprecated)]
137 #[inline]
138 pub fn new_body(topic: impl Into<CheetahString>, body: Option<Bytes>) -> Self {
139 Self::with_details_body(topic, CheetahString::empty(), CheetahString::empty(), 0, body, true)
140 }
141
142 #[deprecated(since = "0.8.0", note = "Use Message::builder() instead")]
144 #[allow(deprecated)]
145 #[inline]
146 pub fn with_tags(topic: impl Into<CheetahString>, tags: impl Into<CheetahString>, body: &[u8]) -> Self {
147 Self::with_details(topic, tags, CheetahString::empty(), 0, body, true)
148 }
149
150 #[deprecated(since = "0.8.0", note = "Use Message::builder() instead")]
152 #[allow(deprecated)]
153 #[inline]
154 pub fn with_tags_bytes(topic: impl Into<CheetahString>, tags: impl Into<CheetahString>, body: Bytes) -> Self {
155 Self::with_details_bytes(topic, tags, CheetahString::empty(), 0, body, true)
156 }
157
158 #[deprecated(since = "0.8.0", note = "Use Message::builder() instead")]
160 #[allow(deprecated)]
161 #[inline]
162 pub fn with_keys(
163 topic: impl Into<CheetahString>,
164 tags: impl Into<CheetahString>,
165 keys: impl Into<CheetahString>,
166 body: &[u8],
167 ) -> Self {
168 Self::with_details(topic, tags, keys, 0, body, true)
169 }
170
171 #[deprecated(since = "0.8.0", note = "Use Message::builder() instead")]
173 #[allow(deprecated)]
174 #[inline]
175 pub fn with_keys_bytes(
176 topic: impl Into<CheetahString>,
177 tags: impl Into<CheetahString>,
178 keys: impl Into<CheetahString>,
179 body: Bytes,
180 ) -> Self {
181 Self::with_details_bytes(topic, tags, keys, 0, body, true)
182 }
183
184 #[deprecated(since = "0.8.0", note = "Use Message::builder() instead")]
186 #[allow(deprecated)]
187 pub fn with_details(
188 topic: impl Into<CheetahString>,
189 tags: impl Into<CheetahString>,
190 keys: impl Into<CheetahString>,
191 flag: i32,
192 body: &[u8],
193 wait_store_msg_ok: bool,
194 ) -> Self {
195 Self::with_details_bytes(topic, tags, keys, flag, Bytes::copy_from_slice(body), wait_store_msg_ok)
196 }
197
198 #[deprecated(since = "0.8.0", note = "Use Message::builder() instead")]
200 pub fn with_details_bytes(
201 topic: impl Into<CheetahString>,
202 tags: impl Into<CheetahString>,
203 keys: impl Into<CheetahString>,
204 flag: i32,
205 body: Bytes,
206 wait_store_msg_ok: bool,
207 ) -> Self {
208 let topic = topic.into();
209 let tags = tags.into();
210 let keys = keys.into();
211
212 let has_tags = !tags.is_empty();
214 let has_keys = !keys.is_empty();
215 let initial_capacity = (has_tags as usize) + (has_keys as usize) + 1;
216 let mut properties = HashMap::with_capacity(initial_capacity);
217
218 if has_tags {
220 properties.insert(CheetahString::from_static_str(MessageConst::PROPERTY_TAGS), tags);
221 }
222
223 if has_keys {
224 properties.insert(CheetahString::from_static_str(MessageConst::PROPERTY_KEYS), keys);
225 }
226
227 if !wait_store_msg_ok {
228 properties.insert(
229 CheetahString::from_static_str(MessageConst::PROPERTY_WAIT_STORE_MSG_OK),
230 CheetahString::from_static_str("false"),
231 );
232 }
233
234 Message {
235 topic,
236 flag: MessageFlag::from_bits(flag),
237 properties: MessageProperties::from_map(properties),
238 body: MessageBody::from(body),
239 transaction_id: None,
240 }
241 }
242
243 pub fn with_details_body(
244 topic: impl Into<CheetahString>,
245 tags: impl Into<CheetahString>,
246 keys: impl Into<CheetahString>,
247 flag: i32,
248 body: Option<Bytes>,
249 wait_store_msg_ok: bool,
250 ) -> Self {
251 let topic = topic.into();
252 let tags = tags.into();
253 let keys = keys.into();
254
255 let has_tags = !tags.is_empty();
257 let has_keys = !keys.is_empty();
258 let initial_capacity = (has_tags as usize) + (has_keys as usize) + 1;
259 let mut properties = HashMap::with_capacity(initial_capacity);
260
261 if has_tags {
262 properties.insert(CheetahString::from_static_str(MessageConst::PROPERTY_TAGS), tags);
263 }
264
265 if has_keys {
266 properties.insert(CheetahString::from_static_str(MessageConst::PROPERTY_KEYS), keys);
267 }
268
269 if !wait_store_msg_ok {
270 properties.insert(
271 CheetahString::from_static_str(MessageConst::PROPERTY_WAIT_STORE_MSG_OK),
272 CheetahString::from_static_str("false"),
273 );
274 }
275
276 Message {
277 topic,
278 flag: MessageFlag::from_bits(flag),
279 properties: MessageProperties::from_map(properties),
280 body: if let Some(b) = body {
281 MessageBody::from(b)
282 } else {
283 MessageBody::empty()
284 },
285 transaction_id: None,
286 }
287 }
288
289 #[inline]
290 pub fn set_tags(&mut self, tags: CheetahString) {
291 self.properties
292 .as_map_mut()
293 .insert(CheetahString::from_static_str(MessageConst::PROPERTY_TAGS), tags);
294 }
295
296 #[inline]
297 pub fn set_keys(&mut self, keys: CheetahString) {
298 self.properties
299 .as_map_mut()
300 .insert(CheetahString::from_static_str(MessageConst::PROPERTY_KEYS), keys);
301 }
302
303 #[inline]
304 pub fn clear_property(&mut self, name: impl Into<CheetahString>) {
305 self.properties.as_map_mut().remove(name.into().as_str());
306 }
307
308 #[inline]
309 pub fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
310 self.properties = MessageProperties::from_map(properties);
311 }
312
313 #[inline]
314 pub fn get_property(&self, key: &CheetahString) -> Option<CheetahString> {
315 self.properties.as_map().get(key).cloned()
316 }
317
318 #[inline]
319 pub fn body(&self) -> Option<bytes::Bytes> {
320 self.body.raw().cloned()
321 }
322
323 #[inline]
324 pub fn flag(&self) -> i32 {
325 self.flag.bits()
326 }
327
328 #[inline]
329 pub fn topic(&self) -> &CheetahString {
330 &self.topic
331 }
332
333 #[inline]
334 pub fn properties(&self) -> &MessageProperties {
335 &self.properties
336 }
337
338 #[inline]
339 pub fn transaction_id(&self) -> Option<&str> {
340 self.transaction_id.as_deref()
341 }
342
343 #[inline]
344 pub fn get_transaction_id(&self) -> Option<&CheetahString> {
345 self.transaction_id.as_ref()
346 }
347
348 #[inline]
349 pub fn get_tags(&self) -> Option<CheetahString> {
350 self.properties.as_map().get(MessageConst::PROPERTY_TAGS).cloned()
351 }
352
353 #[inline]
354 pub fn is_wait_store_msg_ok(&self) -> bool {
355 self.properties.wait_store_msg_ok()
356 }
357
358 #[inline]
359 fn set_wait_store_msg_ok(&mut self, wait_store_msg_ok: bool) {
360 if !wait_store_msg_ok {
361 self.properties.as_map_mut().insert(
362 CheetahString::from_static_str(MessageConst::PROPERTY_WAIT_STORE_MSG_OK),
363 CheetahString::from_static_str("false"),
364 );
365 }
366 }
367
368 #[inline]
369 pub fn delay_time_level(&self) -> i32 {
370 self.properties.delay_level().unwrap_or(0)
371 }
372
373 #[inline]
374 pub fn set_delay_time_level(&mut self, level: i32) {
375 self.properties.as_map_mut().insert(
376 CheetahString::from_static_str(MessageConst::PROPERTY_DELAY_TIME_LEVEL),
377 CheetahString::from(level.to_string()),
378 );
379 }
380
381 #[inline]
382 pub fn get_user_property(&self, name: impl Into<CheetahString>) -> Option<CheetahString> {
383 self.properties.as_map().get(name.into().as_str()).cloned()
384 }
385
386 #[inline]
387 pub fn as_any(&self) -> &dyn Any {
388 self
389 }
390
391 #[inline]
392 pub fn set_instance_id(&mut self, instance_id: impl Into<CheetahString>) {
393 self.properties.as_map_mut().insert(
394 CheetahString::from_static_str(MessageConst::PROPERTY_INSTANCE_ID),
395 instance_id.into(),
396 );
397 }
398
399 #[inline]
405 pub fn body_slice(&self) -> &[u8] {
406 self.body.as_slice()
407 }
408
409 #[inline]
411 pub fn into_body(self) -> MessageBody {
412 self.body
413 }
414
415 #[inline]
417 pub fn tags(&self) -> Option<&str> {
418 self.properties.tags()
419 }
420
421 #[inline]
423 pub fn keys(&self) -> Option<Vec<String>> {
424 self.properties.keys()
425 }
426
427 #[inline]
429 pub fn property(&self, key: &str) -> Option<&str> {
430 self.properties.as_map().get(key).map(|s| s.as_str())
431 }
432
433 #[inline]
435 pub fn message_flag(&self) -> MessageFlag {
436 self.flag
437 }
438
439 #[inline]
441 pub fn wait_store_msg_ok(&self) -> bool {
442 self.is_wait_store_msg_ok()
443 }
444
445 #[inline]
447 pub fn delay_level(&self) -> i32 {
448 self.delay_time_level()
449 }
450
451 #[inline]
453 pub fn buyer_id(&self) -> Option<&str> {
454 self.properties.buyer_id()
455 }
456
457 #[inline]
459 pub fn instance_id(&self) -> Option<&str> {
460 self.properties.instance_id()
461 }
462
463 #[doc(hidden)]
467 #[inline]
468 pub fn topic_mut(&mut self) -> &mut CheetahString {
469 &mut self.topic
470 }
471
472 #[doc(hidden)]
474 #[inline]
475 pub fn flag_mut(&mut self) -> &mut MessageFlag {
476 &mut self.flag
477 }
478
479 #[doc(hidden)]
481 #[inline]
482 pub fn properties_mut(&mut self) -> &mut MessageProperties {
483 &mut self.properties
484 }
485
486 #[doc(hidden)]
488 #[inline]
489 pub fn body_mut(&mut self) -> &mut MessageBody {
490 &mut self.body
491 }
492
493 #[doc(hidden)]
495 #[inline]
496 pub fn transaction_id_mut(&mut self) -> &mut Option<CheetahString> {
497 &mut self.transaction_id
498 }
499
500 #[doc(hidden)]
502 #[inline]
503 pub fn set_topic(&mut self, topic: CheetahString) {
504 self.topic = topic;
505 }
506
507 #[doc(hidden)]
509 #[inline]
510 pub fn set_flag(&mut self, flag: i32) {
511 self.flag = MessageFlag::from_bits(flag);
512 }
513
514 #[doc(hidden)]
516 #[inline]
517 pub fn set_body(&mut self, body: Option<Bytes>) {
518 self.body = if let Some(b) = body {
519 MessageBody::from(b)
520 } else {
521 MessageBody::empty()
522 };
523 }
524
525 #[doc(hidden)]
527 #[inline]
528 pub fn take_body(&mut self) -> Option<Bytes> {
529 let old = std::mem::take(&mut self.body);
530 old.raw().cloned()
531 }
532
533 #[doc(hidden)]
535 #[inline]
536 pub fn compressed_body(&self) -> Option<&Bytes> {
537 self.body.compressed()
538 }
539}
540
541impl Display for Message {
542 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
543 let properties_str = self
544 .properties
545 .as_map()
546 .iter()
547 .map(|(k, v)| format!("{k}: {v}"))
548 .collect::<Vec<_>>()
549 .join(", ");
550
551 let body_len = self.body.len();
552 let compressed = if self.body.is_compressed() {
553 "compressed"
554 } else {
555 "uncompressed"
556 };
557
558 let transaction_id_str = match &self.transaction_id {
559 Some(transaction_id) => format!("Some({transaction_id:?})"),
560 None => "None".to_string(),
561 };
562
563 write!(
564 f,
565 "Message {{ topic: {}, flag: {:?}, properties: {{ {} }}, body: {} bytes ({}), transaction_id: {} }}",
566 self.topic, self.flag, properties_str, body_len, compressed, transaction_id_str
567 )
568 }
569}
570
571#[allow(unused_variables)]
572impl MessageTrait for Message {
573 #[inline]
574 fn put_property(&mut self, key: CheetahString, value: CheetahString) {
575 self.properties.as_map_mut().insert(key, value);
576 }
577
578 #[inline]
579 fn clear_property(&mut self, name: &str) {
580 self.properties.as_map_mut().remove(name);
581 }
582
583 #[inline]
584 fn property(&self, name: &CheetahString) -> Option<CheetahString> {
585 self.properties.as_map().get(name).cloned()
586 }
587
588 fn property_ref(&self, name: &CheetahString) -> Option<&CheetahString> {
589 self.properties.as_map().get(name)
590 }
591
592 #[inline]
593 fn topic(&self) -> &CheetahString {
594 &self.topic
595 }
596
597 #[inline]
598 fn set_topic(&mut self, topic: CheetahString) {
599 self.set_topic(topic);
600 }
601
602 #[inline]
603 fn get_flag(&self) -> i32 {
604 self.flag.bits()
605 }
606
607 #[inline]
608 fn set_flag(&mut self, flag: i32) {
609 self.set_flag(flag);
610 }
611
612 #[inline]
613 fn get_body(&self) -> Option<&Bytes> {
614 self.body.raw()
615 }
616
617 #[inline]
618 fn set_body(&mut self, body: Bytes) {
619 self.set_body(Some(body));
620 }
621
622 #[inline]
623 fn get_properties(&self) -> &HashMap<CheetahString, CheetahString> {
624 self.properties.as_map()
625 }
626
627 #[inline]
628 fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
629 self.properties = MessageProperties::from_map(properties);
630 }
631
632 #[inline]
633 fn transaction_id(&self) -> Option<&CheetahString> {
634 self.transaction_id.as_ref()
635 }
636
637 #[inline]
638 fn set_transaction_id(&mut self, transaction_id: CheetahString) {
639 *self.transaction_id_mut() = Some(transaction_id);
640 }
641
642 #[inline]
643 fn get_compressed_body_mut(&mut self) -> Option<&mut Bytes> {
644 self.body.compressed_mut().as_mut()
645 }
646
647 #[inline]
648 fn get_compressed_body(&self) -> Option<&Bytes> {
649 self.compressed_body()
650 }
651
652 #[inline]
653 fn set_compressed_body_mut(&mut self, compressed_body: Bytes) {
654 self.body.set_compressed(compressed_body);
655 }
656
657 #[inline]
658 fn take_body(&mut self) -> Option<Bytes> {
659 self.take_body()
660 }
661
662 #[inline]
663 fn as_any(&self) -> &dyn Any {
664 self
665 }
666
667 #[inline]
668 fn as_any_mut(&mut self) -> &mut dyn Any {
669 self
670 }
671}
672
673pub fn parse_topic_filter_type(sys_flag: i32) -> TopicFilterType {
674 if (sys_flag & MessageSysFlag::MULTI_TAGS_FLAG) == MessageSysFlag::MULTI_TAGS_FLAG {
675 TopicFilterType::MultiTag
676 } else {
677 TopicFilterType::SingleTag
678 }
679}
680
681pub fn tags_string2tags_code(tags: Option<&CheetahString>) -> i64 {
682 if tags.is_none() {
683 return 0;
684 }
685 let tags = tags.unwrap();
686 if tags.is_empty() {
687 return 0;
688 }
689 JavaStringHasher::hash_str(tags.as_str()) as i64
690}
691
692#[cfg(test)]
693#[allow(deprecated)]
694mod tests {
695 use bytes::Bytes;
696
697 use super::*;
698
699 #[test]
700 fn test_message_new() {
701 let msg = Message::new("test_topic", b"test_body");
702 assert_eq!(msg.topic().as_str(), "test_topic");
703 assert_eq!(msg.body().unwrap().as_ref(), b"test_body");
704 }
705
706 #[test]
707 fn test_message_new_with_bytes() {
708 let body = Bytes::from_static(b"test_body");
709 let msg = Message::new_with_bytes("test_topic", body.clone());
710 assert_eq!(msg.topic().as_str(), "test_topic");
711 assert_eq!(msg.body().unwrap(), body);
712 }
713
714 #[test]
715 fn test_message_new_with_vec() {
716 let body = vec![1u8, 2, 3, 4, 5];
717 let msg = Message::new_with_vec("test_topic", body.clone());
718 assert_eq!(msg.topic().as_str(), "test_topic");
719 assert_eq!(msg.body().unwrap().as_ref(), body.as_slice());
720 }
721
722 #[test]
723 fn test_message_with_tags() {
724 let msg = Message::with_tags("test_topic", "tag1", b"test_body");
725 assert_eq!(msg.topic().as_str(), "test_topic");
726 assert_eq!(msg.get_tags().unwrap().as_str(), "tag1");
727 }
728
729 #[test]
730 fn test_message_with_tags_bytes() {
731 let body = Bytes::from_static(b"test_body");
732 let msg = Message::with_tags_bytes("test_topic", "tag1", body.clone());
733 assert_eq!(msg.topic().as_str(), "test_topic");
734 assert_eq!(msg.get_tags().unwrap().as_str(), "tag1");
735 assert_eq!(msg.body().unwrap(), body);
736 }
737
738 #[test]
739 fn test_message_with_keys() {
740 let msg = Message::with_keys("test_topic", "tag1", "key1", b"test_body");
741 assert_eq!(msg.topic().as_str(), "test_topic");
742 assert_eq!(msg.get_tags().unwrap().as_str(), "tag1");
743 assert_eq!(
744 msg.properties()
745 .as_map()
746 .get(MessageConst::PROPERTY_KEYS)
747 .unwrap()
748 .as_str(),
749 "key1"
750 );
751 }
752
753 #[test]
754 fn test_message_with_keys_bytes() {
755 let body = Bytes::from_static(b"test_body");
756 let msg = Message::with_keys_bytes("test_topic", "tag1", "key1", body.clone());
757 assert_eq!(msg.topic().as_str(), "test_topic");
758 assert_eq!(msg.get_tags().unwrap().as_str(), "tag1");
759 assert_eq!(msg.body().unwrap(), body);
760 }
761
762 #[test]
763 fn test_message_with_details() {
764 let msg = Message::with_details("test_topic", "tag1", "key1", 0, b"test_body", true);
765 assert_eq!(msg.topic().as_str(), "test_topic");
766 assert_eq!(msg.get_tags().unwrap().as_str(), "tag1");
767 assert!(msg.is_wait_store_msg_ok());
768 }
769
770 #[test]
771 fn test_message_with_details_bytes() {
772 let body = Bytes::from_static(b"test_body");
773 let msg = Message::with_details_bytes("test_topic", "tag1", "key1", 0, body.clone(), false);
774 assert_eq!(msg.topic().as_str(), "test_topic");
775 assert!(!msg.is_wait_store_msg_ok());
776 assert_eq!(msg.body().unwrap(), body);
777 }
778
779 #[test]
780 fn test_get_transaction_id_returns_cheetah_string_ref() {
781 let mut msg = Message::new("test_topic", b"test_body");
782 msg.set_transaction_id(CheetahString::from_static_str("tx-123"));
783
784 let transaction_id = msg.get_transaction_id();
785
786 assert_eq!(transaction_id, Some(&CheetahString::from_static_str("tx-123")));
787 }
788
789 #[test]
790 fn test_properties_capacity_optimization() {
791 let msg1 = Message::with_details_bytes(
793 "topic",
794 CheetahString::empty(),
795 CheetahString::empty(),
796 0,
797 Bytes::from_static(b"body"),
798 true,
799 );
800 assert_eq!(msg1.properties().len(), 0);
801
802 let msg2 = Message::with_details_bytes(
804 "topic",
805 "tag1",
806 CheetahString::empty(),
807 0,
808 Bytes::from_static(b"body"),
809 true,
810 );
811 assert_eq!(msg2.properties().len(), 1);
812
813 let msg3 = Message::with_details_bytes("topic", "tag1", "key1", 0, Bytes::from_static(b"body"), true);
815 assert_eq!(msg3.properties().len(), 2);
816
817 let msg4 = Message::with_details_bytes("topic", "tag1", "key1", 0, Bytes::from_static(b"body"), false);
819 assert_eq!(msg4.properties().len(), 3);
820 }
821
822 #[test]
823 fn test_zero_copy_bytes() {
824 let original_bytes = Bytes::from_static(b"test_data");
825 let bytes_clone = original_bytes.clone();
826
827 let msg = Message::new_with_bytes("topic", bytes_clone);
829
830 let body = msg.body().unwrap();
832 assert_eq!(body.as_ptr(), original_bytes.as_ptr());
833 }
834
835 #[test]
836 fn test_put_user_property_error_handling() {
837 use rocketmq_error::RocketMQError;
838
839 let mut msg = Message::new("test_topic", b"test body");
840
841 let result = msg.put_user_property(CheetahString::empty(), CheetahString::from_slice("value"));
843 assert!(result.is_err());
844 if let Err(RocketMQError::InvalidProperty(e)) = result {
845 assert!(e.contains("null or blank"));
846 }
847
848 let result = msg.put_user_property(CheetahString::from_slice("name"), CheetahString::empty());
850 assert!(result.is_err());
851 if let Err(RocketMQError::InvalidProperty(e)) = result {
852 assert!(e.contains("null or blank"));
853 }
854
855 let result = msg.put_user_property(CheetahString::from_slice("KEYS"), CheetahString::from_slice("value"));
857 assert!(result.is_err());
858 if let Err(RocketMQError::InvalidProperty(e)) = result {
859 assert!(e.contains("used by system"));
860 }
861
862 let result = msg.put_user_property(
864 CheetahString::from_slice("my_custom_key"),
865 CheetahString::from_slice("my_value"),
866 );
867 assert!(result.is_ok());
868 assert_eq!(
869 msg.get_user_property(CheetahString::from_slice("my_custom_key"))
870 .unwrap()
871 .as_str(),
872 "my_value"
873 );
874 }
875}