rocketmq_client_rust/utils/
message_util.rs1use bytes::Bytes;
18use cheetah_string::CheetahString;
19use rocketmq_common::common::message::message_single::Message;
20use rocketmq_common::common::message::MessageConst;
21use rocketmq_common::common::message::MessageTrait;
22use rocketmq_common::common::mix_all;
23use rocketmq_common::MessageAccessor::MessageAccessor;
24
25use crate::common::client_error_code::ClientErrorCode;
26
27pub struct MessageUtil;
28
29impl MessageUtil {
30 pub fn create_reply_message(
31 request_message: &Message,
32 body: &[u8],
33 ) -> rocketmq_error::RocketMQResult<Message> {
34 let mut reply_message = Message::default();
35 let cluster = request_message.get_property(&CheetahString::from_static_str(
36 MessageConst::PROPERTY_CLUSTER,
37 ));
38 if let Some(cluster) = cluster {
39 reply_message.set_body(Bytes::copy_from_slice(body));
40 let reply_topic = mix_all::get_retry_topic(&cluster);
41 reply_message.set_topic(CheetahString::from_string(reply_topic));
42 MessageAccessor::put_property(
43 &mut reply_message,
44 CheetahString::from_static_str(MessageConst::PROPERTY_MESSAGE_TYPE),
45 CheetahString::from_static_str(mix_all::REPLY_MESSAGE_FLAG),
46 );
47 if let Some(reply_to) = request_message.get_property(&CheetahString::from_static_str(
48 MessageConst::PROPERTY_MESSAGE_REPLY_TO_CLIENT,
49 )) {
50 MessageAccessor::put_property(
51 &mut reply_message,
52 CheetahString::from_static_str(MessageConst::PROPERTY_MESSAGE_REPLY_TO_CLIENT),
53 reply_to,
54 );
55 }
56 if let Some(correlation_id) = request_message.get_property(
57 &CheetahString::from_static_str(MessageConst::PROPERTY_CORRELATION_ID),
58 ) {
59 MessageAccessor::put_property(
60 &mut reply_message,
61 CheetahString::from_static_str(MessageConst::PROPERTY_CORRELATION_ID),
62 correlation_id,
63 );
64 }
65 if let Some(ttl) = request_message.get_property(&CheetahString::from_static_str(
66 MessageConst::PROPERTY_MESSAGE_TTL,
67 )) {
68 MessageAccessor::put_property(
69 &mut reply_message,
70 CheetahString::from_static_str(MessageConst::PROPERTY_MESSAGE_TTL),
71 ttl,
72 );
73 }
74 Ok(reply_message)
75 } else {
76 let error = mq_client_err!(
77 ClientErrorCode::CREATE_REPLY_MESSAGE_EXCEPTION,
78 format!(
79 "create reply message fail, requestMessage error, property[{}] is null.",
80 MessageConst::PROPERTY_CLUSTER
81 )
82 );
83 Err(error)
84 }
85 }
86
87 pub fn get_reply_to_client(reply_message: &Message) -> Option<CheetahString> {
88 reply_message.get_property(&CheetahString::from_static_str(
89 MessageConst::PROPERTY_MESSAGE_REPLY_TO_CLIENT,
90 ))
91 }
92}