Skip to main content

rocketmq_client_rust/utils/
message_util.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::LazyLock;
16
17use bytes::Bytes;
18use cheetah_string::CheetahString;
19use rocketmq_common::common::message::message_single::Message;
20use rocketmq_common::common::message::MessageConst;
21use rocketmq_common::common::mix_all;
22use rocketmq_common::MessageAccessor::MessageAccessor;
23
24use crate::common::client_error_code::ClientErrorCode;
25
26// Cached static strings to avoid repeated allocations
27static PROPERTY_CLUSTER: LazyLock<CheetahString> =
28    LazyLock::new(|| CheetahString::from_static_str(MessageConst::PROPERTY_CLUSTER));
29static PROPERTY_MESSAGE_TYPE: LazyLock<CheetahString> =
30    LazyLock::new(|| CheetahString::from_static_str(MessageConst::PROPERTY_MESSAGE_TYPE));
31static REPLY_MESSAGE_FLAG: LazyLock<CheetahString> =
32    LazyLock::new(|| CheetahString::from_static_str(mix_all::REPLY_MESSAGE_FLAG));
33static PROPERTY_MESSAGE_REPLY_TO_CLIENT: LazyLock<CheetahString> =
34    LazyLock::new(|| CheetahString::from_static_str(MessageConst::PROPERTY_MESSAGE_REPLY_TO_CLIENT));
35static PROPERTY_CORRELATION_ID: LazyLock<CheetahString> =
36    LazyLock::new(|| CheetahString::from_static_str(MessageConst::PROPERTY_CORRELATION_ID));
37static PROPERTY_MESSAGE_TTL: LazyLock<CheetahString> =
38    LazyLock::new(|| CheetahString::from_static_str(MessageConst::PROPERTY_MESSAGE_TTL));
39
40pub struct MessageUtil;
41
42impl MessageUtil {
43    pub fn create_reply_message(request_message: &Message, body: &[u8]) -> rocketmq_error::RocketMQResult<Message> {
44        // Early return: check required cluster property
45        let Some(cluster) = request_message.property(&PROPERTY_CLUSTER) else {
46            return Err(mq_client_err!(
47                ClientErrorCode::CREATE_REPLY_MESSAGE_EXCEPTION,
48                format!(
49                    "create reply message fail, requestMessage error, property[{}] is null.",
50                    MessageConst::PROPERTY_CLUSTER
51                )
52            ));
53        };
54
55        let mut reply_message = Message::default();
56        reply_message.set_body(Some(Bytes::copy_from_slice(body)));
57
58        let reply_topic = mix_all::get_retry_topic(cluster);
59        reply_message.set_topic(CheetahString::from_string(reply_topic));
60
61        // Set message type using cached static string
62        MessageAccessor::put_property(
63            &mut reply_message,
64            PROPERTY_MESSAGE_TYPE.clone(),
65            REPLY_MESSAGE_FLAG.clone(),
66        );
67
68        // Copy optional properties if present
69        if let Some(reply_to) = request_message.property(&PROPERTY_MESSAGE_REPLY_TO_CLIENT) {
70            MessageAccessor::put_property(
71                &mut reply_message,
72                PROPERTY_MESSAGE_REPLY_TO_CLIENT.clone(),
73                CheetahString::from_slice(reply_to),
74            );
75        }
76
77        if let Some(correlation_id) = request_message.property(&PROPERTY_CORRELATION_ID) {
78            MessageAccessor::put_property(
79                &mut reply_message,
80                PROPERTY_CORRELATION_ID.clone(),
81                CheetahString::from_slice(correlation_id),
82            );
83        }
84
85        if let Some(ttl) = request_message.property(&PROPERTY_MESSAGE_TTL) {
86            MessageAccessor::put_property(
87                &mut reply_message,
88                PROPERTY_MESSAGE_TTL.clone(),
89                CheetahString::from_slice(ttl),
90            );
91        }
92
93        Ok(reply_message)
94    }
95
96    pub fn get_reply_to_client(reply_message: &Message) -> Option<CheetahString> {
97        reply_message
98            .property(&PROPERTY_MESSAGE_REPLY_TO_CLIENT)
99            .map(CheetahString::from_slice)
100    }
101}