rocketmq_client_rust/utils/
message_util.rs1use std::sync::LazyLock;
16
17use bytes::Bytes;
18use cheetah_string::CheetahString;
19use rocketmq_common::common::message::message_single::Message;
20use rocketmq_common::common::message::MessageConst;
21use rocketmq_common::common::mix_all;
22use rocketmq_common::MessageAccessor::MessageAccessor;
23
24use crate::common::client_error_code::ClientErrorCode;
25
26static PROPERTY_CLUSTER: LazyLock<CheetahString> =
28 LazyLock::new(|| CheetahString::from_static_str(MessageConst::PROPERTY_CLUSTER));
29static PROPERTY_MESSAGE_TYPE: LazyLock<CheetahString> =
30 LazyLock::new(|| CheetahString::from_static_str(MessageConst::PROPERTY_MESSAGE_TYPE));
31static REPLY_MESSAGE_FLAG: LazyLock<CheetahString> =
32 LazyLock::new(|| CheetahString::from_static_str(mix_all::REPLY_MESSAGE_FLAG));
33static PROPERTY_MESSAGE_REPLY_TO_CLIENT: LazyLock<CheetahString> =
34 LazyLock::new(|| CheetahString::from_static_str(MessageConst::PROPERTY_MESSAGE_REPLY_TO_CLIENT));
35static PROPERTY_CORRELATION_ID: LazyLock<CheetahString> =
36 LazyLock::new(|| CheetahString::from_static_str(MessageConst::PROPERTY_CORRELATION_ID));
37static PROPERTY_MESSAGE_TTL: LazyLock<CheetahString> =
38 LazyLock::new(|| CheetahString::from_static_str(MessageConst::PROPERTY_MESSAGE_TTL));
39
40pub struct MessageUtil;
41
42impl MessageUtil {
43 pub fn create_reply_message(request_message: &Message, body: &[u8]) -> rocketmq_error::RocketMQResult<Message> {
44 let Some(cluster) = request_message.property(&PROPERTY_CLUSTER) else {
46 return Err(mq_client_err!(
47 ClientErrorCode::CREATE_REPLY_MESSAGE_EXCEPTION,
48 format!(
49 "create reply message fail, requestMessage error, property[{}] is null.",
50 MessageConst::PROPERTY_CLUSTER
51 )
52 ));
53 };
54
55 let mut reply_message = Message::default();
56 reply_message.set_body(Some(Bytes::copy_from_slice(body)));
57
58 let reply_topic = mix_all::get_retry_topic(cluster);
59 reply_message.set_topic(CheetahString::from_string(reply_topic));
60
61 MessageAccessor::put_property(
63 &mut reply_message,
64 PROPERTY_MESSAGE_TYPE.clone(),
65 REPLY_MESSAGE_FLAG.clone(),
66 );
67
68 if let Some(reply_to) = request_message.property(&PROPERTY_MESSAGE_REPLY_TO_CLIENT) {
70 MessageAccessor::put_property(
71 &mut reply_message,
72 PROPERTY_MESSAGE_REPLY_TO_CLIENT.clone(),
73 CheetahString::from_slice(reply_to),
74 );
75 }
76
77 if let Some(correlation_id) = request_message.property(&PROPERTY_CORRELATION_ID) {
78 MessageAccessor::put_property(
79 &mut reply_message,
80 PROPERTY_CORRELATION_ID.clone(),
81 CheetahString::from_slice(correlation_id),
82 );
83 }
84
85 if let Some(ttl) = request_message.property(&PROPERTY_MESSAGE_TTL) {
86 MessageAccessor::put_property(
87 &mut reply_message,
88 PROPERTY_MESSAGE_TTL.clone(),
89 CheetahString::from_slice(ttl),
90 );
91 }
92
93 Ok(reply_message)
94 }
95
96 pub fn get_reply_to_client(reply_message: &Message) -> Option<CheetahString> {
97 reply_message
98 .property(&PROPERTY_MESSAGE_REPLY_TO_CLIENT)
99 .map(CheetahString::from_slice)
100 }
101}