rocketmq_common/common/
message.rs1use std::any::Any;
16use std::collections::HashMap;
17use std::collections::HashSet;
18use std::fmt;
19use std::fmt::Debug;
20use std::fmt::Display;
21use std::string::ToString;
22use std::sync::LazyLock;
23
24use bytes::Buf;
25use bytes::Bytes;
26use cheetah_string::CheetahString;
27use rocketmq_error::RocketMQError;
28use rocketmq_error::RocketMQResult;
29
30pub mod message_accessor;
31pub mod message_batch;
32pub mod message_batch_v2;
33pub mod message_body;
34pub mod message_builder;
35pub mod message_client_ext;
36pub mod message_client_id_setter;
37pub mod message_decoder;
38pub mod message_enum;
39pub mod message_ext;
40pub mod message_ext_broker_inner;
41pub mod message_flag;
42pub mod message_id;
43pub mod message_property;
44pub mod message_queue;
45pub mod message_queue_assignment;
46pub mod message_queue_for_c;
47pub mod message_single;
48
49pub mod broker_message;
51pub mod message_envelope;
52pub mod routing_context;
53pub mod storage_metadata;
54
55pub trait MessageTrait: Any + Display + Debug {
59 #[inline]
61 fn set_keys(&mut self, keys: CheetahString) {
62 self.put_property(CheetahString::from_static_str(MessageConst::PROPERTY_KEYS), keys);
63 }
64
65 fn put_property(&mut self, key: CheetahString, value: CheetahString);
67
68 fn clear_property(&mut self, name: &str);
70
71 fn put_user_property(&mut self, name: CheetahString, value: CheetahString) -> RocketMQResult<()> {
78 if name.is_empty() || value.is_empty() {
79 return Err(RocketMQError::InvalidProperty(
80 "The name or value of property can not be null or blank string!".to_string(),
81 ));
82 }
83 if STRING_HASH_SET.contains(name.as_str()) {
84 return Err(RocketMQError::InvalidProperty(format!(
85 "The Property<{name}> is used by system, input another please"
86 )));
87 }
88 self.put_property(name, value);
89 Ok(())
90 }
91
92 fn user_property(&self, name: &CheetahString) -> Option<CheetahString> {
94 self.property(name)
95 }
96
97 fn user_property_ref(&self, name: &CheetahString) -> Option<&CheetahString> {
99 self.property_ref(name)
100 }
101
102 fn property(&self, name: &CheetahString) -> Option<CheetahString>;
104
105 fn property_ref(&self, name: &CheetahString) -> Option<&CheetahString>;
107
108 fn topic(&self) -> &CheetahString;
110
111 fn set_topic(&mut self, topic: CheetahString);
113
114 fn tags(&self) -> Option<CheetahString> {
116 self.property(&CheetahString::from_static_str(MessageConst::PROPERTY_TAGS))
117 }
118
119 fn tags_ref(&self) -> Option<&CheetahString> {
121 self.property_ref(&CheetahString::from_static_str(MessageConst::PROPERTY_TAGS))
122 }
123
124 fn set_tags(&mut self, tags: CheetahString) {
126 self.put_property(CheetahString::from_static_str(MessageConst::PROPERTY_TAGS), tags);
127 }
128
129 fn get_keys(&self) -> Option<CheetahString> {
131 self.property(&CheetahString::from_static_str(MessageConst::PROPERTY_KEYS))
132 }
133 fn get_keys_ref(&self) -> Option<&CheetahString> {
135 self.property_ref(&CheetahString::from_static_str(MessageConst::PROPERTY_KEYS))
136 }
137 fn set_keys_from_collection(&mut self, key_collection: Vec<String>) {
139 let keys = key_collection.join(MessageConst::KEY_SEPARATOR);
140 self.set_keys(CheetahString::from_string(keys));
141 }
142
143 fn delay_time_level(&self) -> i32 {
145 self.property(&CheetahString::from_static_str(MessageConst::PROPERTY_DELAY_TIME_LEVEL))
146 .and_then(|v| v.parse().ok())
147 .unwrap_or(0)
148 }
149
150 fn set_delay_time_level(&mut self, level: i32) {
152 self.put_property(
153 CheetahString::from_static_str(MessageConst::PROPERTY_DELAY_TIME_LEVEL),
154 CheetahString::from_string(level.to_string()),
155 );
156 }
157
158 fn is_wait_store_msg_ok(&self) -> bool {
162 self.property(&CheetahString::from_static_str(
163 MessageConst::PROPERTY_WAIT_STORE_MSG_OK,
164 ))
165 .map(|v| v.as_str() != "false")
166 .unwrap_or(true)
167 }
168
169 fn set_wait_store_msg_ok(&mut self, wait_store_msg_ok: bool) {
171 self.put_property(
172 CheetahString::from_static_str(MessageConst::PROPERTY_WAIT_STORE_MSG_OK),
173 CheetahString::from_string(wait_store_msg_ok.to_string()),
174 );
175 }
176
177 fn set_instance_id(&mut self, instance_id: CheetahString) {
179 self.put_property(
180 CheetahString::from_static_str(MessageConst::PROPERTY_INSTANCE_ID),
181 instance_id,
182 );
183 }
184
185 fn get_flag(&self) -> i32;
187
188 fn set_flag(&mut self, flag: i32);
190
191 fn get_body(&self) -> Option<&Bytes>;
193
194 fn set_body(&mut self, body: Bytes);
196
197 fn get_properties(&self) -> &HashMap<CheetahString, CheetahString>;
199
200 fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>);
202
203 fn buyer_id(&self) -> Option<CheetahString> {
205 self.property(&CheetahString::from_static_str(MessageConst::PROPERTY_BUYER_ID))
206 }
207
208 fn buyer_id_ref(&self) -> Option<&CheetahString> {
210 self.property_ref(&CheetahString::from_static_str(MessageConst::PROPERTY_BUYER_ID))
211 }
212
213 fn set_buyer_id(&mut self, buyer_id: CheetahString) {
215 self.put_property(
216 CheetahString::from_static_str(MessageConst::PROPERTY_BUYER_ID),
217 buyer_id,
218 );
219 }
220
221 fn transaction_id(&self) -> Option<&CheetahString>;
227
228 fn set_transaction_id(&mut self, transaction_id: CheetahString);
230
231 fn set_delay_time_sec(&mut self, sec: u64) {
233 self.put_property(
234 CheetahString::from_static_str(MessageConst::PROPERTY_TIMER_DELAY_SEC),
235 CheetahString::from_string(sec.to_string()),
236 );
237 }
238
239 fn get_delay_time_sec(&self) -> u64 {
241 self.property(&CheetahString::from_static_str(MessageConst::PROPERTY_TIMER_DELAY_SEC))
242 .and_then(|v| v.parse().ok())
243 .unwrap_or(0)
244 }
245
246 fn set_delay_time_ms(&mut self, time_ms: u64) {
248 self.put_property(
249 CheetahString::from_static_str(MessageConst::PROPERTY_TIMER_DELAY_MS),
250 CheetahString::from_string(time_ms.to_string()),
251 );
252 }
253
254 fn get_delay_time_ms(&self) -> u64 {
256 self.property(&CheetahString::from_static_str(MessageConst::PROPERTY_TIMER_DELAY_MS))
257 .and_then(|v| v.parse().ok())
258 .unwrap_or(0)
259 }
260
261 fn set_deliver_time_ms(&mut self, time_ms: u64) {
263 self.put_property(
264 CheetahString::from_static_str(MessageConst::PROPERTY_TIMER_DELIVER_MS),
265 CheetahString::from_string(time_ms.to_string()),
266 );
267 }
268
269 fn get_deliver_time_ms(&self) -> u64 {
271 self.property(&CheetahString::from_static_str(MessageConst::PROPERTY_TIMER_DELIVER_MS))
272 .and_then(|v| v.parse().ok())
273 .unwrap_or(0)
274 }
275
276 fn get_compressed_body_mut(&mut self) -> Option<&mut Bytes>;
278
279 fn get_compressed_body(&self) -> Option<&Bytes>;
281
282 fn set_compressed_body_mut(&mut self, compressed_body: Bytes);
284
285 fn take_body(&mut self) -> Option<Bytes>;
287
288 fn as_any(&self) -> &dyn Any;
290
291 fn as_any_mut(&mut self) -> &mut dyn Any;
293}
294
295#[cfg(test)]
296mod tests {
297 use super::MessageTrait;
298 use crate::common::message::message_builder::MessageBuilder;
299 use crate::common::message::message_single::Message;
300 use cheetah_string::CheetahString;
301
302 #[test]
303 fn trait_object_get_transaction_id_uses_default_compat_accessor() {
304 let mut message: Message = MessageBuilder::new()
305 .topic("TopicTest")
306 .body_slice(b"payload")
307 .build_unchecked();
308 MessageTrait::set_transaction_id(&mut message, CheetahString::from("tx-123"));
309
310 let message_trait: &dyn MessageTrait = &message;
311
312 assert_eq!(message_trait.transaction_id(), Some(&CheetahString::from("tx-123")));
313 }
314}
315
316pub const MESSAGE_MAGIC_CODE_V1: i32 = -626843481;
318
319pub const MESSAGE_MAGIC_CODE_V2: i32 = -626843477;
321
322#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Default)]
324pub enum MessageVersion {
325 #[default]
326 V1,
327 V2,
328}
329
330impl fmt::Display for MessageVersion {
331 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
332 match self {
333 MessageVersion::V1 => write!(f, "V1"),
334 MessageVersion::V2 => write!(f, "V2"),
335 }
336 }
337}
338
339impl MessageVersion {
340 pub fn value_of_magic_code(magic_code: i32) -> Result<MessageVersion, &'static str> {
346 match magic_code {
347 MESSAGE_MAGIC_CODE_V1 => Ok(MessageVersion::V1),
348 MESSAGE_MAGIC_CODE_V2 => Ok(MessageVersion::V2),
349 _ => Err("Invalid magicCode"),
350 }
351 }
352
353 pub fn get_magic_code(&self) -> i32 {
355 match self {
356 MessageVersion::V1 => MESSAGE_MAGIC_CODE_V1,
357 MessageVersion::V2 => MESSAGE_MAGIC_CODE_V2,
358 }
359 }
360
361 pub fn get_topic_length_size(&self) -> usize {
363 match self {
364 MessageVersion::V1 => 1,
365 MessageVersion::V2 => 2,
366 }
367 }
368
369 pub fn get_topic_length(&self, buffer: &mut Bytes) -> usize {
371 match self {
372 MessageVersion::V1 => buffer.get_u8() as usize,
373 MessageVersion::V2 => buffer.get_i16() as usize,
374 }
375 }
376
377 pub fn get_topic_length_at_index(&self, buffer: &[u8], index: usize) -> usize {
380 match self {
381 MessageVersion::V1 => buffer[index] as usize,
382 MessageVersion::V2 => ((buffer[index] as usize) << 8) | (buffer[index + 1] as usize),
383 }
384 }
385
386 pub fn put_topic_length(&self, buffer: &mut Vec<u8>, topic_length: usize) {
388 match self {
389 MessageVersion::V1 => buffer.push(topic_length as u8),
390 MessageVersion::V2 => {
391 buffer.push((topic_length >> 8) as u8);
392 buffer.push((topic_length & 0xFF) as u8);
393 }
394 }
395 }
396
397 pub fn is_v1(&self) -> bool {
399 match self {
400 MessageVersion::V1 => true,
401 MessageVersion::V2 => false,
402 }
403 }
404
405 pub fn is_v2(&self) -> bool {
407 match self {
408 MessageVersion::V1 => false,
409 MessageVersion::V2 => true,
410 }
411 }
412}
413
414pub struct MessageConst;
416
417impl MessageConst {
418 pub const DUP_INFO: &'static str = "DUP_INFO";
419 pub const KEY_SEPARATOR: &'static str = " ";
420 pub const PROPERTY_BORN_HOST: &'static str = "__BORNHOST";
422 pub const PROPERTY_BORN_TIMESTAMP: &'static str = "BORN_TIMESTAMP";
424 pub const PROPERTY_BUYER_ID: &'static str = "BUYER_ID";
425 pub const PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS: &'static str = "CHECK_IMMUNITY_TIME_IN_SECONDS";
427 pub const PROPERTY_CLUSTER: &'static str = "CLUSTER";
428 pub const PROPERTY_CONSUME_START_TIMESTAMP: &'static str = "CONSUME_START_TIME";
429 pub const PROPERTY_CORRECTION_FLAG: &'static str = "CORRECTION_FLAG";
430 pub const PROPERTY_CORRELATION_ID: &'static str = "CORRELATION_ID";
431 pub const PROPERTY_CRC32: &'static str = "__CRC32#";
432 pub const PROPERTY_DELAY_TIME_LEVEL: &'static str = "DELAY";
433 pub const PROPERTY_DLQ_ORIGIN_MESSAGE_ID: &'static str = "DLQ_ORIGIN_MESSAGE_ID";
434 pub const PROPERTY_STARTDE_LIVER_TIME: &'static str = "__STARTDELIVERTIME";
435 pub const PROPERTY_DLQ_ORIGIN_TOPIC: &'static str = "DLQ_ORIGIN_TOPIC";
437 pub const PROPERTY_EXTEND_UNIQ_INFO: &'static str = "EXTEND_UNIQ_INFO";
438 pub const PROPERTY_FIRST_POP_TIME: &'static str = "1ST_POP_TIME";
439 pub const PROPERTY_FORWARD_QUEUE_ID: &'static str = "PROPERTY_FORWARD_QUEUE_ID";
440 pub const PROPERTY_INNER_BASE: &'static str = "INNER_BASE";
441 pub const PROPERTY_INNER_MULTI_DISPATCH: &'static str = "INNER_MULTI_DISPATCH";
442 pub const PROPERTY_INNER_MULTI_QUEUE_OFFSET: &'static str = "INNER_MULTI_QUEUE_OFFSET";
443 pub const PROPERTY_INNER_NUM: &'static str = "INNER_NUM";
444 pub const PROPERTY_INSTANCE_ID: &'static str = "INSTANCE_ID";
445 pub const PROPERTY_KEYS: &'static str = "KEYS";
446 pub const PROPERTY_MAX_OFFSET: &'static str = "MAX_OFFSET";
447 pub const PROPERTY_MAX_RECONSUME_TIMES: &'static str = "MAX_RECONSUME_TIMES";
448 pub const PROPERTY_MESSAGE_REPLY_TO_CLIENT: &'static str = "REPLY_TO_CLIENT";
449 pub const PROPERTY_MESSAGE_TTL: &'static str = "TTL";
450 pub const PROPERTY_MESSAGE_TYPE: &'static str = "MSG_TYPE";
451 pub const PROPERTY_MIN_OFFSET: &'static str = "MIN_OFFSET";
452 pub const PROPERTY_MQ2_FLAG: &'static str = "MQ2_FLAG";
453 pub const PROPERTY_MSG_REGION: &'static str = "MSG_REGION";
454 pub const PROPERTY_ORIGIN_MESSAGE_ID: &'static str = "ORIGIN_MESSAGE_ID";
455 pub const PROPERTY_POP_CK: &'static str = "POP_CK";
456 pub const PROPERTY_POP_CK_OFFSET: &'static str = "POP_CK_OFFSET";
457 pub const PROPERTY_PRODUCER_GROUP: &'static str = "PGROUP";
458 pub const PROPERTY_PUSH_REPLY_TIME: &'static str = "PUSH_REPLY_TIME";
459 pub const PROPERTY_REAL_QUEUE_ID: &'static str = "REAL_QID";
460 pub const PROPERTY_REAL_TOPIC: &'static str = "REAL_TOPIC";
461 pub const PROPERTY_RECONSUME_TIME: &'static str = "RECONSUME_TIME";
462 pub const PROPERTY_REDIRECT: &'static str = "REDIRECT";
463 pub const PROPERTY_REPLY_MESSAGE_ARRIVE_TIME: &'static str = "ARRIVE_TIME";
464 pub const PROPERTY_RETRY_TOPIC: &'static str = "RETRY_TOPIC";
465 pub const PROPERTY_SHARDING_KEY: &'static str = "__SHARDINGKEY";
466 pub const PROPERTY_TAGS: &'static str = "TAGS";
467 pub const PROPERTY_TIMER_DELAY_LEVEL: &'static str = "TIMER_DELAY_LEVEL";
468 pub const PROPERTY_TIMER_DELAY_MS: &'static str = "TIMER_DELAY_MS";
469 pub const PROPERTY_TIMER_DELAY_SEC: &'static str = "TIMER_DELAY_SEC";
470 pub const PROPERTY_TIMER_DELIVER_MS: &'static str = "TIMER_DELIVER_MS";
471 pub const PROPERTY_TIMER_DEL_UNIQKEY: &'static str = "TIMER_DEL_UNIQKEY";
472 pub const PROPERTY_TIMER_DEQUEUE_MS: &'static str = "TIMER_DEQUEUE_MS";
473 pub const PROPERTY_TIMER_ENQUEUE_MS: &'static str = "TIMER_ENQUEUE_MS";
474 pub const PROPERTY_TIMER_OUT_MS: &'static str = "TIMER_OUT_MS";
475 pub const PROPERTY_TIMER_ROLL_TIMES: &'static str = "TIMER_ROLL_TIMES";
476 pub const PROPERTY_TRACE_CONTEXT: &'static str = "TRACE_CONTEXT";
477 pub const PROPERTY_TRACE_SWITCH: &'static str = "TRACE_ON";
478 pub const PROPERTY_TRANSACTION_CHECK_TIMES: &'static str = "TRANSACTION_CHECK_TIMES";
479 pub const PROPERTY_TRANSACTION_ID: &'static str = "__transactionId__";
480 pub const PROPERTY_TRANSACTION_PREPARED: &'static str = "TRAN_MSG";
481 pub const PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET: &'static str = "TRAN_PREPARED_QUEUE_OFFSET";
482 pub const PROPERTY_TRANSFER_FLAG: &'static str = "TRANSFER_FLAG";
483 pub const PROPERTY_TRANSIENT_GROUP_CONFIG: &'static str = "__RMQ.TRANSIENT.GROUP_SYS_FLAG";
485 pub const PROPERTY_TRANSIENT_PREFIX: &'static str = "__RMQ.TRANSIENT.";
487 pub const PROPERTY_TRANSIENT_TOPIC_CONFIG: &'static str = "__RMQ.TRANSIENT.TOPIC_SYS_FLAG";
489 pub const PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX: &'static str = "UNIQ_KEY";
490 pub const PROPERTY_WAIT_STORE_MSG_OK: &'static str = "WAIT";
491
492 pub const TIMER_ENGINE_ROCKSDB_TIMELINE: &'static str = "R";
494 pub const TIMER_ENGINE_FILE_TIME_WHEEL: &'static str = "F";
496 pub const TIMER_ENGINE_TYPE: &'static str = "timerEngineType";
498
499 pub const INDEX_KEY_TYPE: &'static str = "K";
501 pub const INDEX_UNIQUE_TYPE: &'static str = "U";
503 pub const INDEX_TAG_TYPE: &'static str = "T";
505}
506
507pub static STRING_HASH_SET: LazyLock<HashSet<&'static str>> = LazyLock::new(|| {
509 let mut set = HashSet::with_capacity(64);
510 set.insert(MessageConst::PROPERTY_TRACE_SWITCH);
511 set.insert(MessageConst::PROPERTY_MSG_REGION);
512 set.insert(MessageConst::PROPERTY_KEYS);
513 set.insert(MessageConst::PROPERTY_TAGS);
514 set.insert(MessageConst::PROPERTY_WAIT_STORE_MSG_OK);
515 set.insert(MessageConst::PROPERTY_DELAY_TIME_LEVEL);
516 set.insert(MessageConst::PROPERTY_RETRY_TOPIC);
517 set.insert(MessageConst::PROPERTY_REAL_TOPIC);
518 set.insert(MessageConst::PROPERTY_REAL_QUEUE_ID);
519 set.insert(MessageConst::PROPERTY_TRANSACTION_PREPARED);
520 set.insert(MessageConst::PROPERTY_PRODUCER_GROUP);
521 set.insert(MessageConst::PROPERTY_MIN_OFFSET);
522 set.insert(MessageConst::PROPERTY_MAX_OFFSET);
523 set.insert(MessageConst::PROPERTY_BUYER_ID);
524 set.insert(MessageConst::PROPERTY_ORIGIN_MESSAGE_ID);
525 set.insert(MessageConst::PROPERTY_TRANSFER_FLAG);
526 set.insert(MessageConst::PROPERTY_CORRECTION_FLAG);
527 set.insert(MessageConst::PROPERTY_MQ2_FLAG);
528 set.insert(MessageConst::PROPERTY_RECONSUME_TIME);
529 set.insert(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
530 set.insert(MessageConst::PROPERTY_MAX_RECONSUME_TIMES);
531 set.insert(MessageConst::PROPERTY_CONSUME_START_TIMESTAMP);
532 set.insert(MessageConst::PROPERTY_POP_CK);
533 set.insert(MessageConst::PROPERTY_POP_CK_OFFSET);
534 set.insert(MessageConst::PROPERTY_FIRST_POP_TIME);
535 set.insert(MessageConst::PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET);
536 set.insert(MessageConst::DUP_INFO);
537 set.insert(MessageConst::PROPERTY_EXTEND_UNIQ_INFO);
538 set.insert(MessageConst::PROPERTY_INSTANCE_ID);
539 set.insert(MessageConst::PROPERTY_CORRELATION_ID);
540 set.insert(MessageConst::PROPERTY_MESSAGE_REPLY_TO_CLIENT);
541 set.insert(MessageConst::PROPERTY_MESSAGE_TTL);
542 set.insert(MessageConst::PROPERTY_REPLY_MESSAGE_ARRIVE_TIME);
543 set.insert(MessageConst::PROPERTY_PUSH_REPLY_TIME);
544 set.insert(MessageConst::PROPERTY_CLUSTER);
545 set.insert(MessageConst::PROPERTY_MESSAGE_TYPE);
546 set.insert(MessageConst::PROPERTY_INNER_MULTI_QUEUE_OFFSET);
547 set.insert(MessageConst::PROPERTY_TIMER_DELAY_MS);
548 set.insert(MessageConst::PROPERTY_TIMER_DELAY_SEC);
549 set.insert(MessageConst::PROPERTY_TIMER_DELIVER_MS);
550 set.insert(MessageConst::PROPERTY_TIMER_ENQUEUE_MS);
551 set.insert(MessageConst::PROPERTY_TIMER_DEQUEUE_MS);
552 set.insert(MessageConst::PROPERTY_TIMER_ROLL_TIMES);
553 set.insert(MessageConst::PROPERTY_TIMER_OUT_MS);
554 set.insert(MessageConst::PROPERTY_TIMER_DEL_UNIQKEY);
555 set.insert(MessageConst::PROPERTY_TIMER_DELAY_LEVEL);
556 set.insert(MessageConst::PROPERTY_BORN_HOST);
557 set.insert(MessageConst::PROPERTY_BORN_TIMESTAMP);
558 set.insert(MessageConst::PROPERTY_DLQ_ORIGIN_TOPIC);
559 set.insert(MessageConst::PROPERTY_DLQ_ORIGIN_MESSAGE_ID);
560 set.insert(MessageConst::PROPERTY_CRC32);
561 set
562});