Skip to main content

rocketmq_common/common/
message.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::collections::HashSet;
18use std::fmt;
19use std::fmt::Debug;
20use std::fmt::Display;
21use std::string::ToString;
22use std::sync::LazyLock;
23
24use bytes::Buf;
25use bytes::Bytes;
26use cheetah_string::CheetahString;
27use rocketmq_error::RocketMQError;
28use rocketmq_error::RocketMQResult;
29
30pub mod message_accessor;
31pub mod message_batch;
32pub mod message_batch_v2;
33pub mod message_body;
34pub mod message_builder;
35pub mod message_client_ext;
36pub mod message_client_id_setter;
37pub mod message_decoder;
38pub mod message_enum;
39pub mod message_ext;
40pub mod message_ext_broker_inner;
41pub mod message_flag;
42pub mod message_id;
43pub mod message_property;
44pub mod message_queue;
45pub mod message_queue_assignment;
46pub mod message_queue_for_c;
47pub mod message_single;
48
49// New refactored message types
50pub mod broker_message;
51pub mod message_envelope;
52pub mod routing_context;
53pub mod storage_metadata;
54
55/// Defines the interface for RocketMQ message operations.
56///
57/// Provides methods for managing message properties, keys, tags, body, and metadata.
58pub trait MessageTrait: Any + Display + Debug {
59    /// Sets the keys for the message.
60    #[inline]
61    fn set_keys(&mut self, keys: CheetahString) {
62        self.put_property(CheetahString::from_static_str(MessageConst::PROPERTY_KEYS), keys);
63    }
64
65    /// Adds a property to the message.
66    fn put_property(&mut self, key: CheetahString, value: CheetahString);
67
68    /// Removes the specified property from the message.
69    fn clear_property(&mut self, name: &str);
70
71    /// Adds a user-defined property to the message.
72    ///
73    /// # Errors
74    ///
75    /// Returns an error if the property name is reserved by the system or if the name or value is
76    /// empty.
77    fn put_user_property(&mut self, name: CheetahString, value: CheetahString) -> RocketMQResult<()> {
78        if name.is_empty() || value.is_empty() {
79            return Err(RocketMQError::InvalidProperty(
80                "The name or value of property can not be null or blank string!".to_string(),
81            ));
82        }
83        if STRING_HASH_SET.contains(name.as_str()) {
84            return Err(RocketMQError::InvalidProperty(format!(
85                "The Property<{name}> is used by system, input another please"
86            )));
87        }
88        self.put_property(name, value);
89        Ok(())
90    }
91
92    /// Retrieves a user-defined property value.
93    fn user_property(&self, name: &CheetahString) -> Option<CheetahString> {
94        self.property(name)
95    }
96
97    /// Retrieves a reference to a user-defined property value.
98    fn user_property_ref(&self, name: &CheetahString) -> Option<&CheetahString> {
99        self.property_ref(name)
100    }
101
102    /// Retrieves a property value.
103    fn property(&self, name: &CheetahString) -> Option<CheetahString>;
104
105    /// Retrieves a reference to a property value.
106    fn property_ref(&self, name: &CheetahString) -> Option<&CheetahString>;
107
108    /// Returns the topic of the message.
109    fn topic(&self) -> &CheetahString;
110
111    /// Sets the topic for the message.
112    fn set_topic(&mut self, topic: CheetahString);
113
114    /// Returns the tags associated with the message.
115    fn tags(&self) -> Option<CheetahString> {
116        self.property(&CheetahString::from_static_str(MessageConst::PROPERTY_TAGS))
117    }
118
119    /// Returns a reference to the tags associated with the message.
120    fn tags_ref(&self) -> Option<&CheetahString> {
121        self.property_ref(&CheetahString::from_static_str(MessageConst::PROPERTY_TAGS))
122    }
123
124    /// Sets the tags for the message.
125    fn set_tags(&mut self, tags: CheetahString) {
126        self.put_property(CheetahString::from_static_str(MessageConst::PROPERTY_TAGS), tags);
127    }
128
129    /// Returns the keys associated with the message.
130    fn get_keys(&self) -> Option<CheetahString> {
131        self.property(&CheetahString::from_static_str(MessageConst::PROPERTY_KEYS))
132    }
133    /// Returns a reference to the keys associated with the message.
134    fn get_keys_ref(&self) -> Option<&CheetahString> {
135        self.property_ref(&CheetahString::from_static_str(MessageConst::PROPERTY_KEYS))
136    }
137    /// Sets the message keys from a collection, joining them with spaces.
138    fn set_keys_from_collection(&mut self, key_collection: Vec<String>) {
139        let keys = key_collection.join(MessageConst::KEY_SEPARATOR);
140        self.set_keys(CheetahString::from_string(keys));
141    }
142
143    /// Returns the delay time level of the message, or 0 if not set.
144    fn delay_time_level(&self) -> i32 {
145        self.property(&CheetahString::from_static_str(MessageConst::PROPERTY_DELAY_TIME_LEVEL))
146            .and_then(|v| v.parse().ok())
147            .unwrap_or(0)
148    }
149
150    /// Sets the delay time level for the message.
151    fn set_delay_time_level(&mut self, level: i32) {
152        self.put_property(
153            CheetahString::from_static_str(MessageConst::PROPERTY_DELAY_TIME_LEVEL),
154            CheetahString::from_string(level.to_string()),
155        );
156    }
157
158    /// Returns whether the message should wait for store acknowledgment.
159    ///
160    /// Defaults to `true` if not set.
161    fn is_wait_store_msg_ok(&self) -> bool {
162        self.property(&CheetahString::from_static_str(
163            MessageConst::PROPERTY_WAIT_STORE_MSG_OK,
164        ))
165        .map(|v| v.as_str() != "false")
166        .unwrap_or(true)
167    }
168
169    /// Sets whether the message should wait for store acknowledgment.
170    fn set_wait_store_msg_ok(&mut self, wait_store_msg_ok: bool) {
171        self.put_property(
172            CheetahString::from_static_str(MessageConst::PROPERTY_WAIT_STORE_MSG_OK),
173            CheetahString::from_string(wait_store_msg_ok.to_string()),
174        );
175    }
176
177    /// Sets the instance ID for the message.
178    fn set_instance_id(&mut self, instance_id: CheetahString) {
179        self.put_property(
180            CheetahString::from_static_str(MessageConst::PROPERTY_INSTANCE_ID),
181            instance_id,
182        );
183    }
184
185    /// Returns the flag associated with the message.
186    fn get_flag(&self) -> i32;
187
188    /// Sets the flag for the message.
189    fn set_flag(&mut self, flag: i32);
190
191    /// Returns the body of the message.
192    fn get_body(&self) -> Option<&Bytes>;
193
194    /// Sets the body of the message.
195    fn set_body(&mut self, body: Bytes);
196
197    /// Returns all properties associated with the message.
198    fn get_properties(&self) -> &HashMap<CheetahString, CheetahString>;
199
200    /// Sets multiple properties for the message.
201    fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>);
202
203    /// Returns the buyer ID associated with the message.
204    fn buyer_id(&self) -> Option<CheetahString> {
205        self.property(&CheetahString::from_static_str(MessageConst::PROPERTY_BUYER_ID))
206    }
207
208    /// Returns a reference to the buyer ID associated with the message.
209    fn buyer_id_ref(&self) -> Option<&CheetahString> {
210        self.property_ref(&CheetahString::from_static_str(MessageConst::PROPERTY_BUYER_ID))
211    }
212
213    /// Sets the buyer ID for the message.
214    fn set_buyer_id(&mut self, buyer_id: CheetahString) {
215        self.put_property(
216            CheetahString::from_static_str(MessageConst::PROPERTY_BUYER_ID),
217            buyer_id,
218        );
219    }
220
221    /// Retrieves the transaction ID associated with the message.
222    ///
223    /// # Returns
224    ///
225    /// A reference to the transaction ID as a `&str`.
226    fn transaction_id(&self) -> Option<&CheetahString>;
227
228    /// Sets the transaction ID for the message.
229    fn set_transaction_id(&mut self, transaction_id: CheetahString);
230
231    /// Sets the delay time for the message in seconds.
232    fn set_delay_time_sec(&mut self, sec: u64) {
233        self.put_property(
234            CheetahString::from_static_str(MessageConst::PROPERTY_TIMER_DELAY_SEC),
235            CheetahString::from_string(sec.to_string()),
236        );
237    }
238
239    /// Returns the delay time for the message in seconds, or 0 if not set.
240    fn get_delay_time_sec(&self) -> u64 {
241        self.property(&CheetahString::from_static_str(MessageConst::PROPERTY_TIMER_DELAY_SEC))
242            .and_then(|v| v.parse().ok())
243            .unwrap_or(0)
244    }
245
246    /// Sets the delay time for the message in milliseconds.
247    fn set_delay_time_ms(&mut self, time_ms: u64) {
248        self.put_property(
249            CheetahString::from_static_str(MessageConst::PROPERTY_TIMER_DELAY_MS),
250            CheetahString::from_string(time_ms.to_string()),
251        );
252    }
253
254    /// Returns the delay time for the message in milliseconds, or 0 if not set.
255    fn get_delay_time_ms(&self) -> u64 {
256        self.property(&CheetahString::from_static_str(MessageConst::PROPERTY_TIMER_DELAY_MS))
257            .and_then(|v| v.parse().ok())
258            .unwrap_or(0)
259    }
260
261    /// Sets the delivery time for the message in milliseconds.
262    fn set_deliver_time_ms(&mut self, time_ms: u64) {
263        self.put_property(
264            CheetahString::from_static_str(MessageConst::PROPERTY_TIMER_DELIVER_MS),
265            CheetahString::from_string(time_ms.to_string()),
266        );
267    }
268
269    /// Returns the delivery time for the message in milliseconds, or 0 if not set.
270    fn get_deliver_time_ms(&self) -> u64 {
271        self.property(&CheetahString::from_static_str(MessageConst::PROPERTY_TIMER_DELIVER_MS))
272            .and_then(|v| v.parse().ok())
273            .unwrap_or(0)
274    }
275
276    /// Returns a mutable reference to the compressed body of the message.
277    fn get_compressed_body_mut(&mut self) -> Option<&mut Bytes>;
278
279    /// Returns a reference to the compressed body of the message.
280    fn get_compressed_body(&self) -> Option<&Bytes>;
281
282    /// Sets the compressed body of the message.
283    fn set_compressed_body_mut(&mut self, compressed_body: Bytes);
284
285    /// Takes ownership of the message body, leaving it empty.
286    fn take_body(&mut self) -> Option<Bytes>;
287
288    /// Returns a reference to the message as a trait object.
289    fn as_any(&self) -> &dyn Any;
290
291    /// Returns a mutable reference to the message as a trait object.
292    fn as_any_mut(&mut self) -> &mut dyn Any;
293}
294
295#[cfg(test)]
296mod tests {
297    use super::MessageTrait;
298    use crate::common::message::message_builder::MessageBuilder;
299    use crate::common::message::message_single::Message;
300    use cheetah_string::CheetahString;
301
302    #[test]
303    fn trait_object_get_transaction_id_uses_default_compat_accessor() {
304        let mut message: Message = MessageBuilder::new()
305            .topic("TopicTest")
306            .body_slice(b"payload")
307            .build_unchecked();
308        MessageTrait::set_transaction_id(&mut message, CheetahString::from("tx-123"));
309
310        let message_trait: &dyn MessageTrait = &message;
311
312        assert_eq!(message_trait.transaction_id(), Some(&CheetahString::from("tx-123")));
313    }
314}
315
316/// Magic code for message format version 1.
317pub const MESSAGE_MAGIC_CODE_V1: i32 = -626843481;
318
319/// Magic code for message format version 2.
320pub const MESSAGE_MAGIC_CODE_V2: i32 = -626843477;
321
322/// Represents the message format version.
323#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Default)]
324pub enum MessageVersion {
325    #[default]
326    V1,
327    V2,
328}
329
330impl fmt::Display for MessageVersion {
331    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
332        match self {
333            MessageVersion::V1 => write!(f, "V1"),
334            MessageVersion::V2 => write!(f, "V2"),
335        }
336    }
337}
338
339impl MessageVersion {
340    /// Returns the message version corresponding to the given magic code.
341    ///
342    /// # Errors
343    ///
344    /// Returns an error if the magic code is not recognized.
345    pub fn value_of_magic_code(magic_code: i32) -> Result<MessageVersion, &'static str> {
346        match magic_code {
347            MESSAGE_MAGIC_CODE_V1 => Ok(MessageVersion::V1),
348            MESSAGE_MAGIC_CODE_V2 => Ok(MessageVersion::V2),
349            _ => Err("Invalid magicCode"),
350        }
351    }
352
353    /// Returns the magic code for this message version.
354    pub fn get_magic_code(&self) -> i32 {
355        match self {
356            MessageVersion::V1 => MESSAGE_MAGIC_CODE_V1,
357            MessageVersion::V2 => MESSAGE_MAGIC_CODE_V2,
358        }
359    }
360
361    /// Returns the number of bytes used to encode the topic length.
362    pub fn get_topic_length_size(&self) -> usize {
363        match self {
364            MessageVersion::V1 => 1,
365            MessageVersion::V2 => 2,
366        }
367    }
368
369    /// Reads and returns the topic length from the buffer, advancing the buffer position.
370    pub fn get_topic_length(&self, buffer: &mut Bytes) -> usize {
371        match self {
372            MessageVersion::V1 => buffer.get_u8() as usize,
373            MessageVersion::V2 => buffer.get_i16() as usize,
374        }
375    }
376
377    /// Returns the topic length from the buffer at the specified index without advancing the
378    /// position.
379    pub fn get_topic_length_at_index(&self, buffer: &[u8], index: usize) -> usize {
380        match self {
381            MessageVersion::V1 => buffer[index] as usize,
382            MessageVersion::V2 => ((buffer[index] as usize) << 8) | (buffer[index + 1] as usize),
383        }
384    }
385
386    /// Writes the topic length to the buffer according to the message version.
387    pub fn put_topic_length(&self, buffer: &mut Vec<u8>, topic_length: usize) {
388        match self {
389            MessageVersion::V1 => buffer.push(topic_length as u8),
390            MessageVersion::V2 => {
391                buffer.push((topic_length >> 8) as u8);
392                buffer.push((topic_length & 0xFF) as u8);
393            }
394        }
395    }
396
397    /// Returns `true` if this is message format version 1.
398    pub fn is_v1(&self) -> bool {
399        match self {
400            MessageVersion::V1 => true,
401            MessageVersion::V2 => false,
402        }
403    }
404
405    /// Returns `true` if this is message format version 2.
406    pub fn is_v2(&self) -> bool {
407        match self {
408            MessageVersion::V1 => false,
409            MessageVersion::V2 => true,
410        }
411    }
412}
413
414/// Defines constants for message property names and configuration values.
415pub struct MessageConst;
416
417impl MessageConst {
418    pub const DUP_INFO: &'static str = "DUP_INFO";
419    pub const KEY_SEPARATOR: &'static str = " ";
420    /// Host address where the message was born.
421    pub const PROPERTY_BORN_HOST: &'static str = "__BORNHOST";
422    /// Timestamp when the message was born.
423    pub const PROPERTY_BORN_TIMESTAMP: &'static str = "BORN_TIMESTAMP";
424    pub const PROPERTY_BUYER_ID: &'static str = "BUYER_ID";
425    /// Time in seconds during which transaction checks are suppressed.
426    pub const PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS: &'static str = "CHECK_IMMUNITY_TIME_IN_SECONDS";
427    pub const PROPERTY_CLUSTER: &'static str = "CLUSTER";
428    pub const PROPERTY_CONSUME_START_TIMESTAMP: &'static str = "CONSUME_START_TIME";
429    pub const PROPERTY_CORRECTION_FLAG: &'static str = "CORRECTION_FLAG";
430    pub const PROPERTY_CORRELATION_ID: &'static str = "CORRELATION_ID";
431    pub const PROPERTY_CRC32: &'static str = "__CRC32#";
432    pub const PROPERTY_DELAY_TIME_LEVEL: &'static str = "DELAY";
433    pub const PROPERTY_DLQ_ORIGIN_MESSAGE_ID: &'static str = "DLQ_ORIGIN_MESSAGE_ID";
434    pub const PROPERTY_STARTDE_LIVER_TIME: &'static str = "__STARTDELIVERTIME";
435    /// Original topic name for messages in the dead-letter queue.
436    pub const PROPERTY_DLQ_ORIGIN_TOPIC: &'static str = "DLQ_ORIGIN_TOPIC";
437    pub const PROPERTY_EXTEND_UNIQ_INFO: &'static str = "EXTEND_UNIQ_INFO";
438    pub const PROPERTY_FIRST_POP_TIME: &'static str = "1ST_POP_TIME";
439    pub const PROPERTY_FORWARD_QUEUE_ID: &'static str = "PROPERTY_FORWARD_QUEUE_ID";
440    pub const PROPERTY_INNER_BASE: &'static str = "INNER_BASE";
441    pub const PROPERTY_INNER_MULTI_DISPATCH: &'static str = "INNER_MULTI_DISPATCH";
442    pub const PROPERTY_INNER_MULTI_QUEUE_OFFSET: &'static str = "INNER_MULTI_QUEUE_OFFSET";
443    pub const PROPERTY_INNER_NUM: &'static str = "INNER_NUM";
444    pub const PROPERTY_INSTANCE_ID: &'static str = "INSTANCE_ID";
445    pub const PROPERTY_KEYS: &'static str = "KEYS";
446    pub const PROPERTY_MAX_OFFSET: &'static str = "MAX_OFFSET";
447    pub const PROPERTY_MAX_RECONSUME_TIMES: &'static str = "MAX_RECONSUME_TIMES";
448    pub const PROPERTY_MESSAGE_REPLY_TO_CLIENT: &'static str = "REPLY_TO_CLIENT";
449    pub const PROPERTY_MESSAGE_TTL: &'static str = "TTL";
450    pub const PROPERTY_MESSAGE_TYPE: &'static str = "MSG_TYPE";
451    pub const PROPERTY_MIN_OFFSET: &'static str = "MIN_OFFSET";
452    pub const PROPERTY_MQ2_FLAG: &'static str = "MQ2_FLAG";
453    pub const PROPERTY_MSG_REGION: &'static str = "MSG_REGION";
454    pub const PROPERTY_ORIGIN_MESSAGE_ID: &'static str = "ORIGIN_MESSAGE_ID";
455    pub const PROPERTY_POP_CK: &'static str = "POP_CK";
456    pub const PROPERTY_POP_CK_OFFSET: &'static str = "POP_CK_OFFSET";
457    pub const PROPERTY_PRODUCER_GROUP: &'static str = "PGROUP";
458    pub const PROPERTY_PUSH_REPLY_TIME: &'static str = "PUSH_REPLY_TIME";
459    pub const PROPERTY_REAL_QUEUE_ID: &'static str = "REAL_QID";
460    pub const PROPERTY_REAL_TOPIC: &'static str = "REAL_TOPIC";
461    pub const PROPERTY_RECONSUME_TIME: &'static str = "RECONSUME_TIME";
462    pub const PROPERTY_REDIRECT: &'static str = "REDIRECT";
463    pub const PROPERTY_REPLY_MESSAGE_ARRIVE_TIME: &'static str = "ARRIVE_TIME";
464    pub const PROPERTY_RETRY_TOPIC: &'static str = "RETRY_TOPIC";
465    pub const PROPERTY_SHARDING_KEY: &'static str = "__SHARDINGKEY";
466    pub const PROPERTY_TAGS: &'static str = "TAGS";
467    pub const PROPERTY_TIMER_DELAY_LEVEL: &'static str = "TIMER_DELAY_LEVEL";
468    pub const PROPERTY_TIMER_DELAY_MS: &'static str = "TIMER_DELAY_MS";
469    pub const PROPERTY_TIMER_DELAY_SEC: &'static str = "TIMER_DELAY_SEC";
470    pub const PROPERTY_TIMER_DELIVER_MS: &'static str = "TIMER_DELIVER_MS";
471    pub const PROPERTY_TIMER_DEL_UNIQKEY: &'static str = "TIMER_DEL_UNIQKEY";
472    pub const PROPERTY_TIMER_DEQUEUE_MS: &'static str = "TIMER_DEQUEUE_MS";
473    pub const PROPERTY_TIMER_ENQUEUE_MS: &'static str = "TIMER_ENQUEUE_MS";
474    pub const PROPERTY_TIMER_OUT_MS: &'static str = "TIMER_OUT_MS";
475    pub const PROPERTY_TIMER_ROLL_TIMES: &'static str = "TIMER_ROLL_TIMES";
476    pub const PROPERTY_TRACE_CONTEXT: &'static str = "TRACE_CONTEXT";
477    pub const PROPERTY_TRACE_SWITCH: &'static str = "TRACE_ON";
478    pub const PROPERTY_TRANSACTION_CHECK_TIMES: &'static str = "TRANSACTION_CHECK_TIMES";
479    pub const PROPERTY_TRANSACTION_ID: &'static str = "__transactionId__";
480    pub const PROPERTY_TRANSACTION_PREPARED: &'static str = "TRAN_MSG";
481    pub const PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET: &'static str = "TRAN_PREPARED_QUEUE_OFFSET";
482    pub const PROPERTY_TRANSFER_FLAG: &'static str = "TRANSFER_FLAG";
483    /// Transient property for group system flags set by the client when pulling messages.
484    pub const PROPERTY_TRANSIENT_GROUP_CONFIG: &'static str = "__RMQ.TRANSIENT.GROUP_SYS_FLAG";
485    /// Prefix for transient properties that are not persisted to broker disk.
486    pub const PROPERTY_TRANSIENT_PREFIX: &'static str = "__RMQ.TRANSIENT.";
487    /// Transient property for topic system flags set by the client when pulling messages.
488    pub const PROPERTY_TRANSIENT_TOPIC_CONFIG: &'static str = "__RMQ.TRANSIENT.TOPIC_SYS_FLAG";
489    pub const PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX: &'static str = "UNIQ_KEY";
490    pub const PROPERTY_WAIT_STORE_MSG_OK: &'static str = "WAIT";
491
492    /// Timer engine type identifier for RocksDB timeline implementation.
493    pub const TIMER_ENGINE_ROCKSDB_TIMELINE: &'static str = "R";
494    /// Timer engine type identifier for file-based time wheel implementation.
495    pub const TIMER_ENGINE_FILE_TIME_WHEEL: &'static str = "F";
496    /// Property name for timer engine type.
497    pub const TIMER_ENGINE_TYPE: &'static str = "timerEngineType";
498
499    /// Index type identifier for message key indexing.
500    pub const INDEX_KEY_TYPE: &'static str = "K";
501    /// Index type identifier for unique indexing.
502    pub const INDEX_UNIQUE_TYPE: &'static str = "U";
503    /// Index type identifier for tag indexing.
504    pub const INDEX_TAG_TYPE: &'static str = "T";
505}
506
507/// Set of system-reserved property names that cannot be used as user-defined properties.
508pub static STRING_HASH_SET: LazyLock<HashSet<&'static str>> = LazyLock::new(|| {
509    let mut set = HashSet::with_capacity(64);
510    set.insert(MessageConst::PROPERTY_TRACE_SWITCH);
511    set.insert(MessageConst::PROPERTY_MSG_REGION);
512    set.insert(MessageConst::PROPERTY_KEYS);
513    set.insert(MessageConst::PROPERTY_TAGS);
514    set.insert(MessageConst::PROPERTY_WAIT_STORE_MSG_OK);
515    set.insert(MessageConst::PROPERTY_DELAY_TIME_LEVEL);
516    set.insert(MessageConst::PROPERTY_RETRY_TOPIC);
517    set.insert(MessageConst::PROPERTY_REAL_TOPIC);
518    set.insert(MessageConst::PROPERTY_REAL_QUEUE_ID);
519    set.insert(MessageConst::PROPERTY_TRANSACTION_PREPARED);
520    set.insert(MessageConst::PROPERTY_PRODUCER_GROUP);
521    set.insert(MessageConst::PROPERTY_MIN_OFFSET);
522    set.insert(MessageConst::PROPERTY_MAX_OFFSET);
523    set.insert(MessageConst::PROPERTY_BUYER_ID);
524    set.insert(MessageConst::PROPERTY_ORIGIN_MESSAGE_ID);
525    set.insert(MessageConst::PROPERTY_TRANSFER_FLAG);
526    set.insert(MessageConst::PROPERTY_CORRECTION_FLAG);
527    set.insert(MessageConst::PROPERTY_MQ2_FLAG);
528    set.insert(MessageConst::PROPERTY_RECONSUME_TIME);
529    set.insert(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
530    set.insert(MessageConst::PROPERTY_MAX_RECONSUME_TIMES);
531    set.insert(MessageConst::PROPERTY_CONSUME_START_TIMESTAMP);
532    set.insert(MessageConst::PROPERTY_POP_CK);
533    set.insert(MessageConst::PROPERTY_POP_CK_OFFSET);
534    set.insert(MessageConst::PROPERTY_FIRST_POP_TIME);
535    set.insert(MessageConst::PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET);
536    set.insert(MessageConst::DUP_INFO);
537    set.insert(MessageConst::PROPERTY_EXTEND_UNIQ_INFO);
538    set.insert(MessageConst::PROPERTY_INSTANCE_ID);
539    set.insert(MessageConst::PROPERTY_CORRELATION_ID);
540    set.insert(MessageConst::PROPERTY_MESSAGE_REPLY_TO_CLIENT);
541    set.insert(MessageConst::PROPERTY_MESSAGE_TTL);
542    set.insert(MessageConst::PROPERTY_REPLY_MESSAGE_ARRIVE_TIME);
543    set.insert(MessageConst::PROPERTY_PUSH_REPLY_TIME);
544    set.insert(MessageConst::PROPERTY_CLUSTER);
545    set.insert(MessageConst::PROPERTY_MESSAGE_TYPE);
546    set.insert(MessageConst::PROPERTY_INNER_MULTI_QUEUE_OFFSET);
547    set.insert(MessageConst::PROPERTY_TIMER_DELAY_MS);
548    set.insert(MessageConst::PROPERTY_TIMER_DELAY_SEC);
549    set.insert(MessageConst::PROPERTY_TIMER_DELIVER_MS);
550    set.insert(MessageConst::PROPERTY_TIMER_ENQUEUE_MS);
551    set.insert(MessageConst::PROPERTY_TIMER_DEQUEUE_MS);
552    set.insert(MessageConst::PROPERTY_TIMER_ROLL_TIMES);
553    set.insert(MessageConst::PROPERTY_TIMER_OUT_MS);
554    set.insert(MessageConst::PROPERTY_TIMER_DEL_UNIQKEY);
555    set.insert(MessageConst::PROPERTY_TIMER_DELAY_LEVEL);
556    set.insert(MessageConst::PROPERTY_BORN_HOST);
557    set.insert(MessageConst::PROPERTY_BORN_TIMESTAMP);
558    set.insert(MessageConst::PROPERTY_DLQ_ORIGIN_TOPIC);
559    set.insert(MessageConst::PROPERTY_DLQ_ORIGIN_MESSAGE_ID);
560    set.insert(MessageConst::PROPERTY_CRC32);
561    set
562});