Skip to main content

rocketmq_common/common/message/
message_property.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use cheetah_string::CheetahString;
18
19use crate::common::message::MessageConst;
20
21/// Type-safe message properties.
22#[derive(Clone, Debug, Default)]
23pub struct MessageProperties {
24    inner: HashMap<CheetahString, CheetahString>,
25}
26
27impl MessageProperties {
28    /// Creates a new empty properties collection.
29    pub fn new() -> Self {
30        Self::default()
31    }
32
33    /// Gets a property value by key.
34    #[inline]
35    pub fn get(&self, key: MessagePropertyKey) -> Option<&str> {
36        self.inner.get(key.as_str()).map(|s| s.as_str())
37    }
38
39    /// Sets a property value.
40    pub(crate) fn insert(&mut self, key: MessagePropertyKey, value: impl Into<CheetahString>) {
41        self.inner.insert(key.to_cheetah_string(), value.into());
42    }
43
44    /// Removes a property.
45    pub(crate) fn remove(&mut self, key: MessagePropertyKey) -> Option<CheetahString> {
46        self.inner.remove(key.as_str())
47    }
48
49    /// Returns all properties as a map (for serialization and external crates).
50    #[inline]
51    pub fn as_map(&self) -> &HashMap<CheetahString, CheetahString> {
52        &self.inner
53    }
54
55    /// Returns all properties as a mutable map (for internal use).
56    #[doc(hidden)]
57    #[inline]
58    pub fn as_map_mut(&mut self) -> &mut HashMap<CheetahString, CheetahString> {
59        &mut self.inner
60    }
61
62    /// Creates MessageProperties from a HashMap.
63    #[inline]
64    pub fn from_map(map: HashMap<CheetahString, CheetahString>) -> Self {
65        Self { inner: map }
66    }
67
68    /// Returns the number of properties.
69    #[inline]
70    pub fn len(&self) -> usize {
71        self.inner.len()
72    }
73
74    /// Returns true if there are no properties.
75    #[inline]
76    pub fn is_empty(&self) -> bool {
77        self.inner.is_empty()
78    }
79
80    // Convenience accessors for well-known properties
81
82    /// Returns the tags, if any.
83    #[inline]
84    pub fn tags(&self) -> Option<&str> {
85        self.get(MessagePropertyKey::Tags)
86    }
87
88    /// Returns the keys as a vector, if any.
89    pub fn keys(&self) -> Option<Vec<String>> {
90        self.get(MessagePropertyKey::Keys).map(|s| {
91            s.split(MessageConst::KEY_SEPARATOR)
92                .filter(|k| !k.is_empty())
93                .map(|k| k.to_string())
94                .collect()
95        })
96    }
97
98    /// Returns the delay time level.
99    pub fn delay_level(&self) -> Option<i32> {
100        self.get(MessagePropertyKey::DelayTimeLevel)
101            .and_then(|s| s.parse().ok())
102    }
103
104    /// Returns the delay time in seconds.
105    pub fn delay_time_sec(&self) -> Option<u64> {
106        self.get(MessagePropertyKey::DelayTimeSec).and_then(|s| s.parse().ok())
107    }
108
109    /// Returns the delay time in milliseconds.
110    pub fn delay_time_ms(&self) -> Option<u64> {
111        self.get(MessagePropertyKey::DelayTimeMs).and_then(|s| s.parse().ok())
112    }
113
114    /// Returns the delivery time in milliseconds.
115    pub fn deliver_time_ms(&self) -> Option<u64> {
116        self.get(MessagePropertyKey::DeliverTimeMs).and_then(|s| s.parse().ok())
117    }
118
119    /// Returns the buyer ID.
120    pub fn buyer_id(&self) -> Option<&str> {
121        self.get(MessagePropertyKey::BuyerId)
122    }
123
124    /// Returns whether to wait for store confirmation.
125    pub fn wait_store_msg_ok(&self) -> bool {
126        self.get(MessagePropertyKey::WaitStoreMsgOk)
127            .map(|s| s != "false")
128            .unwrap_or(true)
129    }
130
131    /// Returns the origin message ID.
132    pub fn origin_message_id(&self) -> Option<&str> {
133        self.get(MessagePropertyKey::OriginMessageId)
134    }
135
136    /// Returns the retry topic.
137    pub fn retry_topic(&self) -> Option<&str> {
138        self.get(MessagePropertyKey::RetryTopic)
139    }
140
141    /// Returns the real topic.
142    pub fn real_topic(&self) -> Option<&str> {
143        self.get(MessagePropertyKey::RealTopic)
144    }
145
146    /// Returns the real queue ID.
147    pub fn real_queue_id(&self) -> Option<i32> {
148        self.get(MessagePropertyKey::RealQueueId).and_then(|s| s.parse().ok())
149    }
150
151    /// Returns the unique client message ID.
152    pub fn unique_client_msg_id(&self) -> Option<&str> {
153        self.get(MessagePropertyKey::UniqueClientMsgId)
154    }
155
156    /// Returns the producer group.
157    pub fn producer_group(&self) -> Option<&str> {
158        self.get(MessagePropertyKey::ProducerGroup)
159    }
160
161    /// Returns the instance ID.
162    pub fn instance_id(&self) -> Option<&str> {
163        self.get(MessagePropertyKey::InstanceId)
164    }
165
166    /// Returns the correlation ID.
167    pub fn correlation_id(&self) -> Option<&str> {
168        self.get(MessagePropertyKey::CorrelationId)
169    }
170
171    /// Returns the message type.
172    pub fn message_type(&self) -> Option<&str> {
173        self.get(MessagePropertyKey::MessageType)
174    }
175
176    /// Returns the trace switch.
177    pub fn trace_switch(&self) -> bool {
178        self.get(MessagePropertyKey::TraceSwitch)
179            .map(|s| s == "true")
180            .unwrap_or(false)
181    }
182}
183
184/// Type-safe property keys.
185#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
186pub enum MessagePropertyKey {
187    Tags,
188    Keys,
189    WaitStoreMsgOk,
190    DelayTimeLevel,
191    DelayTimeSec,
192    DelayTimeMs,
193    DeliverTimeMs,
194    RetryTopic,
195    RealTopic,
196    RealQueueId,
197    TransactionPrepared,
198    ProducerGroup,
199    MinOffset,
200    MaxOffset,
201    BuyerId,
202    OriginMessageId,
203    TransferFlag,
204    CorrectionFlag,
205    MqReplyTopic,
206    MqReplyQueueId,
207    UniqueClientMsgId,
208    ReconsumeTime,
209    MsgRegion,
210    TraceSwitch,
211    UniqueKey,
212    MaxReconsumeTimes,
213    ConsumeStartTimestamp,
214    PopCk,
215    PopCkOffset,
216    FirstPopTime,
217    TransactionPreparedQueueOffset,
218    DupInfo,
219    ExtendUniqInfo,
220    InstanceId,
221    CorrelationId,
222    MessageReplyToClient,
223    MessageTtl,
224    ReplyMessageArriveTime,
225    PushReplyTime,
226    Cluster,
227    MessageType,
228    InnerMultiQueueOffset,
229    TimerDelayLevel,
230    TimerEnqueueMs,
231    TimerDequeueMs,
232    TimerRollTimes,
233    TimerOutMs,
234    TimerDelUniqkey,
235    BornHost,
236    BornTimestamp,
237    DlqOriginTopic,
238    DlqOriginMessageId,
239    Crc32,
240    Redirect,
241    ForwardQueueId,
242    InnerBase,
243    InnerMultiDispatch,
244    InnerNum,
245    Mq2Flag,
246    ShardingKey,
247    TransactionId,
248    TransactionCheckTimes,
249    TransientGroupConfig,
250    TransientTopicConfig,
251    TraceContext,
252}
253
254impl MessagePropertyKey {
255    /// Returns the string representation of this key.
256    pub fn as_str(&self) -> &'static str {
257        match self {
258            Self::Tags => MessageConst::PROPERTY_TAGS,
259            Self::Keys => MessageConst::PROPERTY_KEYS,
260            Self::WaitStoreMsgOk => MessageConst::PROPERTY_WAIT_STORE_MSG_OK,
261            Self::DelayTimeLevel => MessageConst::PROPERTY_DELAY_TIME_LEVEL,
262            Self::DelayTimeSec => MessageConst::PROPERTY_TIMER_DELAY_SEC,
263            Self::DelayTimeMs => MessageConst::PROPERTY_TIMER_DELAY_MS,
264            Self::DeliverTimeMs => MessageConst::PROPERTY_TIMER_DELIVER_MS,
265            Self::RetryTopic => MessageConst::PROPERTY_RETRY_TOPIC,
266            Self::RealTopic => MessageConst::PROPERTY_REAL_TOPIC,
267            Self::RealQueueId => MessageConst::PROPERTY_REAL_QUEUE_ID,
268            Self::TransactionPrepared => MessageConst::PROPERTY_TRANSACTION_PREPARED,
269            Self::ProducerGroup => MessageConst::PROPERTY_PRODUCER_GROUP,
270            Self::MinOffset => MessageConst::PROPERTY_MIN_OFFSET,
271            Self::MaxOffset => MessageConst::PROPERTY_MAX_OFFSET,
272            Self::BuyerId => MessageConst::PROPERTY_BUYER_ID,
273            Self::OriginMessageId => MessageConst::PROPERTY_ORIGIN_MESSAGE_ID,
274            Self::TransferFlag => MessageConst::PROPERTY_TRANSFER_FLAG,
275            Self::CorrectionFlag => MessageConst::PROPERTY_CORRECTION_FLAG,
276            Self::MqReplyTopic => MessageConst::PROPERTY_MESSAGE_REPLY_TO_CLIENT,
277            Self::MqReplyQueueId => MessageConst::PROPERTY_MESSAGE_REPLY_TO_CLIENT,
278            Self::UniqueClientMsgId => MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
279            Self::ReconsumeTime => MessageConst::PROPERTY_RECONSUME_TIME,
280            Self::MsgRegion => MessageConst::PROPERTY_MSG_REGION,
281            Self::TraceSwitch => MessageConst::PROPERTY_TRACE_SWITCH,
282            Self::UniqueKey => MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
283            Self::MaxReconsumeTimes => MessageConst::PROPERTY_MAX_RECONSUME_TIMES,
284            Self::ConsumeStartTimestamp => MessageConst::PROPERTY_CONSUME_START_TIMESTAMP,
285            Self::PopCk => MessageConst::PROPERTY_POP_CK,
286            Self::PopCkOffset => MessageConst::PROPERTY_POP_CK_OFFSET,
287            Self::FirstPopTime => MessageConst::PROPERTY_FIRST_POP_TIME,
288            Self::TransactionPreparedQueueOffset => MessageConst::PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET,
289            Self::DupInfo => MessageConst::DUP_INFO,
290            Self::ExtendUniqInfo => MessageConst::PROPERTY_EXTEND_UNIQ_INFO,
291            Self::InstanceId => MessageConst::PROPERTY_INSTANCE_ID,
292            Self::CorrelationId => MessageConst::PROPERTY_CORRELATION_ID,
293            Self::MessageReplyToClient => MessageConst::PROPERTY_MESSAGE_REPLY_TO_CLIENT,
294            Self::MessageTtl => MessageConst::PROPERTY_MESSAGE_TTL,
295            Self::ReplyMessageArriveTime => MessageConst::PROPERTY_REPLY_MESSAGE_ARRIVE_TIME,
296            Self::PushReplyTime => MessageConst::PROPERTY_PUSH_REPLY_TIME,
297            Self::Cluster => MessageConst::PROPERTY_CLUSTER,
298            Self::MessageType => MessageConst::PROPERTY_MESSAGE_TYPE,
299            Self::InnerMultiQueueOffset => MessageConst::PROPERTY_INNER_MULTI_QUEUE_OFFSET,
300            Self::TimerDelayLevel => MessageConst::PROPERTY_TIMER_DELAY_LEVEL,
301            Self::TimerEnqueueMs => MessageConst::PROPERTY_TIMER_ENQUEUE_MS,
302            Self::TimerDequeueMs => MessageConst::PROPERTY_TIMER_DEQUEUE_MS,
303            Self::TimerRollTimes => MessageConst::PROPERTY_TIMER_ROLL_TIMES,
304            Self::TimerOutMs => MessageConst::PROPERTY_TIMER_OUT_MS,
305            Self::TimerDelUniqkey => MessageConst::PROPERTY_TIMER_DEL_UNIQKEY,
306            Self::BornHost => MessageConst::PROPERTY_BORN_HOST,
307            Self::BornTimestamp => MessageConst::PROPERTY_BORN_TIMESTAMP,
308            Self::DlqOriginTopic => MessageConst::PROPERTY_DLQ_ORIGIN_TOPIC,
309            Self::DlqOriginMessageId => MessageConst::PROPERTY_DLQ_ORIGIN_MESSAGE_ID,
310            Self::Crc32 => MessageConst::PROPERTY_CRC32,
311            Self::Redirect => MessageConst::PROPERTY_REDIRECT,
312            Self::ForwardQueueId => MessageConst::PROPERTY_FORWARD_QUEUE_ID,
313            Self::InnerBase => MessageConst::PROPERTY_INNER_BASE,
314            Self::InnerMultiDispatch => MessageConst::PROPERTY_INNER_MULTI_DISPATCH,
315            Self::InnerNum => MessageConst::PROPERTY_INNER_NUM,
316            Self::Mq2Flag => MessageConst::PROPERTY_MQ2_FLAG,
317            Self::ShardingKey => MessageConst::PROPERTY_SHARDING_KEY,
318            Self::TransactionId => MessageConst::PROPERTY_TRANSACTION_ID,
319            Self::TransactionCheckTimes => MessageConst::PROPERTY_TRANSACTION_CHECK_TIMES,
320            Self::TransientGroupConfig => MessageConst::PROPERTY_TRANSIENT_GROUP_CONFIG,
321            Self::TransientTopicConfig => MessageConst::PROPERTY_TRANSIENT_TOPIC_CONFIG,
322            Self::TraceContext => MessageConst::PROPERTY_TRACE_CONTEXT,
323        }
324    }
325
326    pub(crate) fn to_cheetah_string(self) -> CheetahString {
327        CheetahString::from_static_str(self.as_str())
328    }
329}