rocketmq_common/common/message/
message_property.rs1use std::collections::HashMap;
16
17use cheetah_string::CheetahString;
18
19use crate::common::message::MessageConst;
20
21#[derive(Clone, Debug, Default)]
23pub struct MessageProperties {
24 inner: HashMap<CheetahString, CheetahString>,
25}
26
27impl MessageProperties {
28 pub fn new() -> Self {
30 Self::default()
31 }
32
33 #[inline]
35 pub fn get(&self, key: MessagePropertyKey) -> Option<&str> {
36 self.inner.get(key.as_str()).map(|s| s.as_str())
37 }
38
39 pub(crate) fn insert(&mut self, key: MessagePropertyKey, value: impl Into<CheetahString>) {
41 self.inner.insert(key.to_cheetah_string(), value.into());
42 }
43
44 pub(crate) fn remove(&mut self, key: MessagePropertyKey) -> Option<CheetahString> {
46 self.inner.remove(key.as_str())
47 }
48
49 #[inline]
51 pub fn as_map(&self) -> &HashMap<CheetahString, CheetahString> {
52 &self.inner
53 }
54
55 #[doc(hidden)]
57 #[inline]
58 pub fn as_map_mut(&mut self) -> &mut HashMap<CheetahString, CheetahString> {
59 &mut self.inner
60 }
61
62 #[inline]
64 pub fn from_map(map: HashMap<CheetahString, CheetahString>) -> Self {
65 Self { inner: map }
66 }
67
68 #[inline]
70 pub fn len(&self) -> usize {
71 self.inner.len()
72 }
73
74 #[inline]
76 pub fn is_empty(&self) -> bool {
77 self.inner.is_empty()
78 }
79
80 #[inline]
84 pub fn tags(&self) -> Option<&str> {
85 self.get(MessagePropertyKey::Tags)
86 }
87
88 pub fn keys(&self) -> Option<Vec<String>> {
90 self.get(MessagePropertyKey::Keys).map(|s| {
91 s.split(MessageConst::KEY_SEPARATOR)
92 .filter(|k| !k.is_empty())
93 .map(|k| k.to_string())
94 .collect()
95 })
96 }
97
98 pub fn delay_level(&self) -> Option<i32> {
100 self.get(MessagePropertyKey::DelayTimeLevel)
101 .and_then(|s| s.parse().ok())
102 }
103
104 pub fn delay_time_sec(&self) -> Option<u64> {
106 self.get(MessagePropertyKey::DelayTimeSec).and_then(|s| s.parse().ok())
107 }
108
109 pub fn delay_time_ms(&self) -> Option<u64> {
111 self.get(MessagePropertyKey::DelayTimeMs).and_then(|s| s.parse().ok())
112 }
113
114 pub fn deliver_time_ms(&self) -> Option<u64> {
116 self.get(MessagePropertyKey::DeliverTimeMs).and_then(|s| s.parse().ok())
117 }
118
119 pub fn buyer_id(&self) -> Option<&str> {
121 self.get(MessagePropertyKey::BuyerId)
122 }
123
124 pub fn wait_store_msg_ok(&self) -> bool {
126 self.get(MessagePropertyKey::WaitStoreMsgOk)
127 .map(|s| s != "false")
128 .unwrap_or(true)
129 }
130
131 pub fn origin_message_id(&self) -> Option<&str> {
133 self.get(MessagePropertyKey::OriginMessageId)
134 }
135
136 pub fn retry_topic(&self) -> Option<&str> {
138 self.get(MessagePropertyKey::RetryTopic)
139 }
140
141 pub fn real_topic(&self) -> Option<&str> {
143 self.get(MessagePropertyKey::RealTopic)
144 }
145
146 pub fn real_queue_id(&self) -> Option<i32> {
148 self.get(MessagePropertyKey::RealQueueId).and_then(|s| s.parse().ok())
149 }
150
151 pub fn unique_client_msg_id(&self) -> Option<&str> {
153 self.get(MessagePropertyKey::UniqueClientMsgId)
154 }
155
156 pub fn producer_group(&self) -> Option<&str> {
158 self.get(MessagePropertyKey::ProducerGroup)
159 }
160
161 pub fn instance_id(&self) -> Option<&str> {
163 self.get(MessagePropertyKey::InstanceId)
164 }
165
166 pub fn correlation_id(&self) -> Option<&str> {
168 self.get(MessagePropertyKey::CorrelationId)
169 }
170
171 pub fn message_type(&self) -> Option<&str> {
173 self.get(MessagePropertyKey::MessageType)
174 }
175
176 pub fn trace_switch(&self) -> bool {
178 self.get(MessagePropertyKey::TraceSwitch)
179 .map(|s| s == "true")
180 .unwrap_or(false)
181 }
182}
183
184#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
186pub enum MessagePropertyKey {
187 Tags,
188 Keys,
189 WaitStoreMsgOk,
190 DelayTimeLevel,
191 DelayTimeSec,
192 DelayTimeMs,
193 DeliverTimeMs,
194 RetryTopic,
195 RealTopic,
196 RealQueueId,
197 TransactionPrepared,
198 ProducerGroup,
199 MinOffset,
200 MaxOffset,
201 BuyerId,
202 OriginMessageId,
203 TransferFlag,
204 CorrectionFlag,
205 MqReplyTopic,
206 MqReplyQueueId,
207 UniqueClientMsgId,
208 ReconsumeTime,
209 MsgRegion,
210 TraceSwitch,
211 UniqueKey,
212 MaxReconsumeTimes,
213 ConsumeStartTimestamp,
214 PopCk,
215 PopCkOffset,
216 FirstPopTime,
217 TransactionPreparedQueueOffset,
218 DupInfo,
219 ExtendUniqInfo,
220 InstanceId,
221 CorrelationId,
222 MessageReplyToClient,
223 MessageTtl,
224 ReplyMessageArriveTime,
225 PushReplyTime,
226 Cluster,
227 MessageType,
228 InnerMultiQueueOffset,
229 TimerDelayLevel,
230 TimerEnqueueMs,
231 TimerDequeueMs,
232 TimerRollTimes,
233 TimerOutMs,
234 TimerDelUniqkey,
235 BornHost,
236 BornTimestamp,
237 DlqOriginTopic,
238 DlqOriginMessageId,
239 Crc32,
240 Redirect,
241 ForwardQueueId,
242 InnerBase,
243 InnerMultiDispatch,
244 InnerNum,
245 Mq2Flag,
246 ShardingKey,
247 TransactionId,
248 TransactionCheckTimes,
249 TransientGroupConfig,
250 TransientTopicConfig,
251 TraceContext,
252}
253
254impl MessagePropertyKey {
255 pub fn as_str(&self) -> &'static str {
257 match self {
258 Self::Tags => MessageConst::PROPERTY_TAGS,
259 Self::Keys => MessageConst::PROPERTY_KEYS,
260 Self::WaitStoreMsgOk => MessageConst::PROPERTY_WAIT_STORE_MSG_OK,
261 Self::DelayTimeLevel => MessageConst::PROPERTY_DELAY_TIME_LEVEL,
262 Self::DelayTimeSec => MessageConst::PROPERTY_TIMER_DELAY_SEC,
263 Self::DelayTimeMs => MessageConst::PROPERTY_TIMER_DELAY_MS,
264 Self::DeliverTimeMs => MessageConst::PROPERTY_TIMER_DELIVER_MS,
265 Self::RetryTopic => MessageConst::PROPERTY_RETRY_TOPIC,
266 Self::RealTopic => MessageConst::PROPERTY_REAL_TOPIC,
267 Self::RealQueueId => MessageConst::PROPERTY_REAL_QUEUE_ID,
268 Self::TransactionPrepared => MessageConst::PROPERTY_TRANSACTION_PREPARED,
269 Self::ProducerGroup => MessageConst::PROPERTY_PRODUCER_GROUP,
270 Self::MinOffset => MessageConst::PROPERTY_MIN_OFFSET,
271 Self::MaxOffset => MessageConst::PROPERTY_MAX_OFFSET,
272 Self::BuyerId => MessageConst::PROPERTY_BUYER_ID,
273 Self::OriginMessageId => MessageConst::PROPERTY_ORIGIN_MESSAGE_ID,
274 Self::TransferFlag => MessageConst::PROPERTY_TRANSFER_FLAG,
275 Self::CorrectionFlag => MessageConst::PROPERTY_CORRECTION_FLAG,
276 Self::MqReplyTopic => MessageConst::PROPERTY_MESSAGE_REPLY_TO_CLIENT,
277 Self::MqReplyQueueId => MessageConst::PROPERTY_MESSAGE_REPLY_TO_CLIENT,
278 Self::UniqueClientMsgId => MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
279 Self::ReconsumeTime => MessageConst::PROPERTY_RECONSUME_TIME,
280 Self::MsgRegion => MessageConst::PROPERTY_MSG_REGION,
281 Self::TraceSwitch => MessageConst::PROPERTY_TRACE_SWITCH,
282 Self::UniqueKey => MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
283 Self::MaxReconsumeTimes => MessageConst::PROPERTY_MAX_RECONSUME_TIMES,
284 Self::ConsumeStartTimestamp => MessageConst::PROPERTY_CONSUME_START_TIMESTAMP,
285 Self::PopCk => MessageConst::PROPERTY_POP_CK,
286 Self::PopCkOffset => MessageConst::PROPERTY_POP_CK_OFFSET,
287 Self::FirstPopTime => MessageConst::PROPERTY_FIRST_POP_TIME,
288 Self::TransactionPreparedQueueOffset => MessageConst::PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET,
289 Self::DupInfo => MessageConst::DUP_INFO,
290 Self::ExtendUniqInfo => MessageConst::PROPERTY_EXTEND_UNIQ_INFO,
291 Self::InstanceId => MessageConst::PROPERTY_INSTANCE_ID,
292 Self::CorrelationId => MessageConst::PROPERTY_CORRELATION_ID,
293 Self::MessageReplyToClient => MessageConst::PROPERTY_MESSAGE_REPLY_TO_CLIENT,
294 Self::MessageTtl => MessageConst::PROPERTY_MESSAGE_TTL,
295 Self::ReplyMessageArriveTime => MessageConst::PROPERTY_REPLY_MESSAGE_ARRIVE_TIME,
296 Self::PushReplyTime => MessageConst::PROPERTY_PUSH_REPLY_TIME,
297 Self::Cluster => MessageConst::PROPERTY_CLUSTER,
298 Self::MessageType => MessageConst::PROPERTY_MESSAGE_TYPE,
299 Self::InnerMultiQueueOffset => MessageConst::PROPERTY_INNER_MULTI_QUEUE_OFFSET,
300 Self::TimerDelayLevel => MessageConst::PROPERTY_TIMER_DELAY_LEVEL,
301 Self::TimerEnqueueMs => MessageConst::PROPERTY_TIMER_ENQUEUE_MS,
302 Self::TimerDequeueMs => MessageConst::PROPERTY_TIMER_DEQUEUE_MS,
303 Self::TimerRollTimes => MessageConst::PROPERTY_TIMER_ROLL_TIMES,
304 Self::TimerOutMs => MessageConst::PROPERTY_TIMER_OUT_MS,
305 Self::TimerDelUniqkey => MessageConst::PROPERTY_TIMER_DEL_UNIQKEY,
306 Self::BornHost => MessageConst::PROPERTY_BORN_HOST,
307 Self::BornTimestamp => MessageConst::PROPERTY_BORN_TIMESTAMP,
308 Self::DlqOriginTopic => MessageConst::PROPERTY_DLQ_ORIGIN_TOPIC,
309 Self::DlqOriginMessageId => MessageConst::PROPERTY_DLQ_ORIGIN_MESSAGE_ID,
310 Self::Crc32 => MessageConst::PROPERTY_CRC32,
311 Self::Redirect => MessageConst::PROPERTY_REDIRECT,
312 Self::ForwardQueueId => MessageConst::PROPERTY_FORWARD_QUEUE_ID,
313 Self::InnerBase => MessageConst::PROPERTY_INNER_BASE,
314 Self::InnerMultiDispatch => MessageConst::PROPERTY_INNER_MULTI_DISPATCH,
315 Self::InnerNum => MessageConst::PROPERTY_INNER_NUM,
316 Self::Mq2Flag => MessageConst::PROPERTY_MQ2_FLAG,
317 Self::ShardingKey => MessageConst::PROPERTY_SHARDING_KEY,
318 Self::TransactionId => MessageConst::PROPERTY_TRANSACTION_ID,
319 Self::TransactionCheckTimes => MessageConst::PROPERTY_TRANSACTION_CHECK_TIMES,
320 Self::TransientGroupConfig => MessageConst::PROPERTY_TRANSIENT_GROUP_CONFIG,
321 Self::TransientTopicConfig => MessageConst::PROPERTY_TRANSIENT_TOPIC_CONFIG,
322 Self::TraceContext => MessageConst::PROPERTY_TRACE_CONTEXT,
323 }
324 }
325
326 pub(crate) fn to_cheetah_string(self) -> CheetahString {
327 CheetahString::from_static_str(self.as_str())
328 }
329}