1use std::any::Any;
18use std::collections::HashMap;
19use std::collections::HashSet;
20use std::fmt;
21use std::fmt::Debug;
22use std::fmt::Display;
23use std::string::ToString;
24use std::sync::LazyLock;
25
26use bytes::Buf;
27use bytes::Bytes;
28use cheetah_string::CheetahString;
29use lazy_static::lazy_static;
30
31pub mod message_accessor;
32pub mod message_batch;
33pub mod message_client_ext;
34pub mod message_client_id_setter;
35pub mod message_decoder;
36pub mod message_enum;
37pub mod message_ext;
38pub mod message_ext_broker_inner;
39pub mod message_id;
40pub mod message_queue;
41pub mod message_queue_assignment;
42pub mod message_single;
43
44pub trait MessageTrait: Any + Display + Debug {
49 fn set_keys(&mut self, keys: CheetahString) {
55 self.put_property(
56 CheetahString::from_static_str(MessageConst::PROPERTY_KEYS),
57 keys,
58 );
59 }
60
61 fn put_property(&mut self, key: CheetahString, value: CheetahString);
68
69 fn clear_property(&mut self, name: &str);
75
76 fn put_user_property(&mut self, name: CheetahString, value: CheetahString) {
83 if STRING_HASH_SET.contains(name.as_str()) {
84 panic!("The Property<{name}> is used by system, input another please");
85 }
86 if value.is_empty() || name.is_empty() {
87 panic!("The name or value of property can not be null or blank string!");
88 }
89 self.put_property(name, value);
90 }
91
92 fn get_user_property(&self, name: &CheetahString) -> Option<CheetahString> {
102 self.get_property(name)
103 }
104
105 fn get_property(&self, name: &CheetahString) -> Option<CheetahString>;
115
116 fn get_topic(&self) -> &CheetahString;
122
123 fn set_topic(&mut self, topic: CheetahString);
129
130 fn get_tags(&self) -> Option<CheetahString> {
136 self.get_property(&CheetahString::from_static_str(MessageConst::PROPERTY_TAGS))
137 }
138
139 fn set_tags(&mut self, tags: CheetahString) {
145 self.put_property(
146 CheetahString::from_static_str(MessageConst::PROPERTY_TAGS),
147 tags,
148 );
149 }
150
151 fn get_keys(&self) -> Option<CheetahString> {
157 self.get_property(&CheetahString::from_static_str(MessageConst::PROPERTY_KEYS))
158 }
159
160 fn set_keys_from_collection(&mut self, key_collection: Vec<String>) {
166 let keys = key_collection.join(MessageConst::KEY_SEPARATOR);
167 self.set_keys(CheetahString::from_string(keys));
168 }
169
170 fn get_delay_time_level(&self) -> i32 {
176 self.get_property(&CheetahString::from_static_str(
177 MessageConst::PROPERTY_DELAY_TIME_LEVEL,
178 ))
179 .unwrap_or(CheetahString::from_slice("0"))
180 .parse()
181 .unwrap_or(0)
182 }
183
184 fn set_delay_time_level(&mut self, level: i32) {
190 self.put_property(
191 CheetahString::from_static_str(MessageConst::PROPERTY_DELAY_TIME_LEVEL),
192 CheetahString::from_string(level.to_string()),
193 );
194 }
195
196 fn is_wait_store_msg_ok(&self) -> bool {
202 self.get_property(&CheetahString::from_static_str(
203 MessageConst::PROPERTY_WAIT_STORE_MSG_OK,
204 ))
205 .unwrap_or(CheetahString::from_slice("true"))
206 .parse()
207 .unwrap_or(true)
208 }
209
210 fn set_wait_store_msg_ok(&mut self, wait_store_msg_ok: bool) {
216 self.put_property(
217 CheetahString::from_static_str(MessageConst::PROPERTY_WAIT_STORE_MSG_OK),
218 CheetahString::from_string(wait_store_msg_ok.to_string()),
219 );
220 }
221
222 fn set_instance_id(&mut self, instance_id: CheetahString) {
228 self.put_property(
229 CheetahString::from_static_str(MessageConst::PROPERTY_INSTANCE_ID),
230 instance_id,
231 );
232 }
233
234 fn get_flag(&self) -> i32;
240
241 fn set_flag(&mut self, flag: i32);
247
248 fn get_body(&self) -> Option<&Bytes>;
254
255 fn set_body(&mut self, body: Bytes);
261
262 fn get_properties(&self) -> &HashMap<CheetahString, CheetahString>;
268
269 fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>);
275
276 fn get_buyer_id(&self) -> Option<CheetahString> {
282 self.get_property(&CheetahString::from_static_str(
283 MessageConst::PROPERTY_BUYER_ID,
284 ))
285 }
286
287 fn set_buyer_id(&mut self, buyer_id: CheetahString) {
293 self.put_property(
294 CheetahString::from_static_str(MessageConst::PROPERTY_BUYER_ID),
295 buyer_id,
296 );
297 }
298
299 fn get_transaction_id(&self) -> Option<&CheetahString>;
305
306 fn set_transaction_id(&mut self, transaction_id: CheetahString);
312
313 fn set_delay_time_sec(&mut self, sec: u64) {
319 self.put_property(
320 CheetahString::from_static_str(MessageConst::PROPERTY_TIMER_DELAY_SEC),
321 CheetahString::from_string(sec.to_string()),
322 );
323 }
324
325 fn get_delay_time_sec(&self) -> u64 {
331 self.get_property(&CheetahString::from_static_str(
332 MessageConst::PROPERTY_TIMER_DELAY_SEC,
333 ))
334 .unwrap_or(CheetahString::from_slice("0"))
335 .parse()
336 .unwrap_or(0)
337 }
338
339 fn set_delay_time_ms(&mut self, time_ms: u64) {
345 self.put_property(
346 CheetahString::from_static_str(MessageConst::PROPERTY_TIMER_DELAY_MS),
347 CheetahString::from_string(time_ms.to_string()),
348 );
349 }
350
351 fn get_delay_time_ms(&self) -> u64 {
357 self.get_property(&CheetahString::from_static_str(
358 MessageConst::PROPERTY_TIMER_DELAY_MS,
359 ))
360 .unwrap_or(CheetahString::from_slice("0"))
361 .parse()
362 .unwrap_or(0)
363 }
364
365 fn set_deliver_time_ms(&mut self, time_ms: u64) {
371 self.put_property(
372 CheetahString::from_static_str(MessageConst::PROPERTY_TIMER_DELIVER_MS),
373 CheetahString::from_string(time_ms.to_string()),
374 );
375 }
376
377 fn get_deliver_time_ms(&self) -> u64 {
383 self.get_property(&CheetahString::from_static_str(
384 MessageConst::PROPERTY_TIMER_DELIVER_MS,
385 ))
386 .unwrap_or(CheetahString::from_slice("0"))
387 .parse()
388 .unwrap_or(0)
389 }
390
391 fn get_compressed_body_mut(&mut self) -> &mut Option<Bytes>;
396
397 fn get_compressed_body(&self) -> Option<&Bytes>;
402
403 fn set_compressed_body_mut(&mut self, compressed_body: Bytes);
408
409 fn take_body(&mut self) -> Option<Bytes>;
414
415 fn as_any(&self) -> &dyn Any;
421
422 fn as_any_mut(&mut self) -> &mut dyn Any;
428}
429
430pub const MESSAGE_MAGIC_CODE_V1: i32 = -626843481;
431pub const MESSAGE_MAGIC_CODE_V2: i32 = -626843477;
432
433#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Default)]
434pub enum MessageVersion {
435 #[default]
436 V1,
437 V2,
438}
439
440impl fmt::Display for MessageVersion {
441 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
442 match self {
443 MessageVersion::V1 => write!(f, "V1"),
444 MessageVersion::V2 => write!(f, "V2"),
445 }
446 }
447}
448
449impl MessageVersion {
450 pub fn value_of_magic_code(magic_code: i32) -> Result<MessageVersion, &'static str> {
451 match magic_code {
452 MESSAGE_MAGIC_CODE_V1 => Ok(MessageVersion::V1),
453 MESSAGE_MAGIC_CODE_V2 => Ok(MessageVersion::V2),
454 _ => Err("Invalid magicCode"),
455 }
456 }
457
458 pub fn get_magic_code(&self) -> i32 {
459 match self {
460 MessageVersion::V1 => MESSAGE_MAGIC_CODE_V1,
461 MessageVersion::V2 => MESSAGE_MAGIC_CODE_V2,
462 }
463 }
464
465 pub fn get_topic_length_size(&self) -> usize {
466 match self {
467 MessageVersion::V1 => 1,
468 MessageVersion::V2 => 2,
469 }
470 }
471
472 pub fn get_topic_length(&self, buffer: &mut Bytes) -> usize {
473 match self {
474 MessageVersion::V1 => buffer.get_u8() as usize,
475 MessageVersion::V2 => buffer.get_i16() as usize,
476 }
477 }
478
479 pub fn get_topic_length_at_index(&self, buffer: &[u8], index: usize) -> usize {
480 match self {
481 MessageVersion::V1 => buffer[index] as usize,
482 MessageVersion::V2 => ((buffer[index] as usize) << 8) | (buffer[index + 1] as usize),
483 }
484 }
485
486 pub fn put_topic_length(&self, buffer: &mut Vec<u8>, topic_length: usize) {
487 match self {
488 MessageVersion::V1 => buffer.push(topic_length as u8),
489 MessageVersion::V2 => {
490 buffer.push((topic_length >> 8) as u8);
491 buffer.push((topic_length & 0xFF) as u8);
492 }
493 }
494 }
495
496 pub fn is_v1(&self) -> bool {
497 match self {
498 MessageVersion::V1 => true,
499 MessageVersion::V2 => false,
500 }
501 }
502
503 pub fn is_v2(&self) -> bool {
504 match self {
505 MessageVersion::V1 => false,
506 MessageVersion::V2 => true,
507 }
508 }
509}
510
511pub struct MessageConst;
512
513impl MessageConst {
514 pub const DUP_INFO: &'static str = "DUP_INFO";
515 pub const KEY_SEPARATOR: &'static str = " ";
516 pub const PROPERTY_BORN_HOST: &'static str = "__BORNHOST";
517 pub const PROPERTY_BORN_TIMESTAMP: &'static str = "BORN_TIMESTAMP";
518 pub const PROPERTY_BUYER_ID: &'static str = "BUYER_ID";
519 pub const PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS: &'static str =
520 "CHECK_IMMUNITY_TIME_IN_SECONDS";
521 pub const PROPERTY_CLUSTER: &'static str = "CLUSTER";
522 pub const PROPERTY_CONSUME_START_TIMESTAMP: &'static str = "CONSUME_START_TIME";
523 pub const PROPERTY_CORRECTION_FLAG: &'static str = "CORRECTION_FLAG";
524 pub const PROPERTY_CORRELATION_ID: &'static str = "CORRELATION_ID";
525 pub const PROPERTY_CRC32: &'static str = "__CRC32#";
526 pub const PROPERTY_DELAY_TIME_LEVEL: &'static str = "DELAY";
527 pub const PROPERTY_DLQ_ORIGIN_MESSAGE_ID: &'static str = "DLQ_ORIGIN_MESSAGE_ID";
528 pub const PROPERTY_STARTDE_LIVER_TIME: &'static str = "__STARTDELIVERTIME";
529 pub const PROPERTY_DLQ_ORIGIN_TOPIC: &'static str = "DLQ_ORIGIN_TOPIC";
533 pub const PROPERTY_EXTEND_UNIQ_INFO: &'static str = "EXTEND_UNIQ_INFO";
534 pub const PROPERTY_FIRST_POP_TIME: &'static str = "1ST_POP_TIME";
535 pub const PROPERTY_FORWARD_QUEUE_ID: &'static str = "PROPERTY_FORWARD_QUEUE_ID";
536 pub const PROPERTY_INNER_BASE: &'static str = "INNER_BASE";
537 pub const PROPERTY_INNER_MULTI_DISPATCH: &'static str = "INNER_MULTI_DISPATCH";
538 pub const PROPERTY_INNER_MULTI_QUEUE_OFFSET: &'static str = "INNER_MULTI_QUEUE_OFFSET";
539 pub const PROPERTY_INNER_NUM: &'static str = "INNER_NUM";
540 pub const PROPERTY_INSTANCE_ID: &'static str = "INSTANCE_ID";
541 pub const PROPERTY_KEYS: &'static str = "KEYS";
542 pub const PROPERTY_MAX_OFFSET: &'static str = "MAX_OFFSET";
543 pub const PROPERTY_MAX_RECONSUME_TIMES: &'static str = "MAX_RECONSUME_TIMES";
544 pub const PROPERTY_MESSAGE_REPLY_TO_CLIENT: &'static str = "REPLY_TO_CLIENT";
545 pub const PROPERTY_MESSAGE_TTL: &'static str = "TTL";
546 pub const PROPERTY_MESSAGE_TYPE: &'static str = "MSG_TYPE";
547 pub const PROPERTY_MIN_OFFSET: &'static str = "MIN_OFFSET";
548 pub const PROPERTY_MQ2_FLAG: &'static str = "MQ2_FLAG";
549 pub const PROPERTY_MSG_REGION: &'static str = "MSG_REGION";
550 pub const PROPERTY_ORIGIN_MESSAGE_ID: &'static str = "ORIGIN_MESSAGE_ID";
551 pub const PROPERTY_POP_CK: &'static str = "POP_CK";
552 pub const PROPERTY_POP_CK_OFFSET: &'static str = "POP_CK_OFFSET";
553 pub const PROPERTY_PRODUCER_GROUP: &'static str = "PGROUP";
554 pub const PROPERTY_PUSH_REPLY_TIME: &'static str = "PUSH_REPLY_TIME";
555 pub const PROPERTY_REAL_QUEUE_ID: &'static str = "REAL_QID";
556 pub const PROPERTY_REAL_TOPIC: &'static str = "REAL_TOPIC";
557 pub const PROPERTY_RECONSUME_TIME: &'static str = "RECONSUME_TIME";
558 pub const PROPERTY_REDIRECT: &'static str = "REDIRECT";
559 pub const PROPERTY_REPLY_MESSAGE_ARRIVE_TIME: &'static str = "ARRIVE_TIME";
560 pub const PROPERTY_RETRY_TOPIC: &'static str = "RETRY_TOPIC";
561 pub const PROPERTY_SHARDING_KEY: &'static str = "__SHARDINGKEY";
562 pub const PROPERTY_TAGS: &'static str = "TAGS";
563 pub const PROPERTY_TIMER_DELAY_LEVEL: &'static str = "TIMER_DELAY_LEVEL";
564 pub const PROPERTY_TIMER_DELAY_MS: &'static str = "TIMER_DELAY_MS";
565 pub const PROPERTY_TIMER_DELAY_SEC: &'static str = "TIMER_DELAY_SEC";
566 pub const PROPERTY_TIMER_DELIVER_MS: &'static str = "TIMER_DELIVER_MS";
567 pub const PROPERTY_TIMER_DEL_UNIQKEY: &'static str = "TIMER_DEL_UNIQKEY";
568 pub const PROPERTY_TIMER_DEQUEUE_MS: &'static str = "TIMER_DEQUEUE_MS";
569 pub const PROPERTY_TIMER_ENQUEUE_MS: &'static str = "TIMER_ENQUEUE_MS";
570 pub const PROPERTY_TIMER_OUT_MS: &'static str = "TIMER_OUT_MS";
571 pub const PROPERTY_TIMER_ROLL_TIMES: &'static str = "TIMER_ROLL_TIMES";
572 pub const PROPERTY_TRACE_CONTEXT: &'static str = "TRACE_CONTEXT";
573 pub const PROPERTY_TRACE_SWITCH: &'static str = "TRACE_ON";
574 pub const PROPERTY_TRANSACTION_CHECK_TIMES: &'static str = "TRANSACTION_CHECK_TIMES";
575 pub const PROPERTY_TRANSACTION_ID: &'static str = "__transactionId__";
576 pub const PROPERTY_TRANSACTION_PREPARED: &'static str = "TRAN_MSG";
577 pub const PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET: &'static str =
578 "TRAN_PREPARED_QUEUE_OFFSET";
579 pub const PROPERTY_TRANSFER_FLAG: &'static str = "TRANSFER_FLAG";
580 pub const PROPERTY_TRANSIENT_GROUP_CONFIG: &'static str = "__RMQ.TRANSIENT.GROUP_SYS_FLAG";
584 pub const PROPERTY_TRANSIENT_PREFIX: &'static str = "__RMQ.TRANSIENT.";
589 pub const PROPERTY_TRANSIENT_TOPIC_CONFIG: &'static str = "__RMQ.TRANSIENT.TOPIC_SYS_FLAG";
593 pub const PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX: &'static str = "UNIQ_KEY";
594 pub const PROPERTY_WAIT_STORE_MSG_OK: &'static str = "WAIT";
595}
596
597pub static STRING_HASH_SET: LazyLock<HashSet<&'static str>> = LazyLock::new(|| {
598 let mut set = HashSet::with_capacity(64);
599 set.insert(MessageConst::PROPERTY_TRACE_SWITCH);
600 set.insert(MessageConst::PROPERTY_MSG_REGION);
601 set.insert(MessageConst::PROPERTY_KEYS);
602 set.insert(MessageConst::PROPERTY_TAGS);
603 set.insert(MessageConst::PROPERTY_WAIT_STORE_MSG_OK);
604 set.insert(MessageConst::PROPERTY_DELAY_TIME_LEVEL);
605 set.insert(MessageConst::PROPERTY_RETRY_TOPIC);
606 set.insert(MessageConst::PROPERTY_REAL_TOPIC);
607 set.insert(MessageConst::PROPERTY_REAL_QUEUE_ID);
608 set.insert(MessageConst::PROPERTY_TRANSACTION_PREPARED);
609 set.insert(MessageConst::PROPERTY_PRODUCER_GROUP);
610 set.insert(MessageConst::PROPERTY_MIN_OFFSET);
611 set.insert(MessageConst::PROPERTY_MAX_OFFSET);
612 set.insert(MessageConst::PROPERTY_BUYER_ID);
613 set.insert(MessageConst::PROPERTY_ORIGIN_MESSAGE_ID);
614 set.insert(MessageConst::PROPERTY_TRANSFER_FLAG);
615 set.insert(MessageConst::PROPERTY_CORRECTION_FLAG);
616 set.insert(MessageConst::PROPERTY_MQ2_FLAG);
617 set.insert(MessageConst::PROPERTY_RECONSUME_TIME);
618 set.insert(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
619 set.insert(MessageConst::PROPERTY_MAX_RECONSUME_TIMES);
620 set.insert(MessageConst::PROPERTY_CONSUME_START_TIMESTAMP);
621 set.insert(MessageConst::PROPERTY_POP_CK);
622 set.insert(MessageConst::PROPERTY_POP_CK_OFFSET);
623 set.insert(MessageConst::PROPERTY_FIRST_POP_TIME);
624 set.insert(MessageConst::PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET);
625 set.insert(MessageConst::DUP_INFO);
626 set.insert(MessageConst::PROPERTY_EXTEND_UNIQ_INFO);
627 set.insert(MessageConst::PROPERTY_INSTANCE_ID);
628 set.insert(MessageConst::PROPERTY_CORRELATION_ID);
629 set.insert(MessageConst::PROPERTY_MESSAGE_REPLY_TO_CLIENT);
630 set.insert(MessageConst::PROPERTY_MESSAGE_TTL);
631 set.insert(MessageConst::PROPERTY_REPLY_MESSAGE_ARRIVE_TIME);
632 set.insert(MessageConst::PROPERTY_PUSH_REPLY_TIME);
633 set.insert(MessageConst::PROPERTY_CLUSTER);
634 set.insert(MessageConst::PROPERTY_MESSAGE_TYPE);
635 set.insert(MessageConst::PROPERTY_INNER_MULTI_QUEUE_OFFSET);
636 set.insert(MessageConst::PROPERTY_TIMER_DELAY_MS);
637 set.insert(MessageConst::PROPERTY_TIMER_DELAY_SEC);
638 set.insert(MessageConst::PROPERTY_TIMER_DELIVER_MS);
639 set.insert(MessageConst::PROPERTY_TIMER_ENQUEUE_MS);
640 set.insert(MessageConst::PROPERTY_TIMER_DEQUEUE_MS);
641 set.insert(MessageConst::PROPERTY_TIMER_ROLL_TIMES);
642 set.insert(MessageConst::PROPERTY_TIMER_OUT_MS);
643 set.insert(MessageConst::PROPERTY_TIMER_DEL_UNIQKEY);
644 set.insert(MessageConst::PROPERTY_TIMER_DELAY_LEVEL);
645 set.insert(MessageConst::PROPERTY_BORN_HOST);
646 set.insert(MessageConst::PROPERTY_BORN_TIMESTAMP);
647 set.insert(MessageConst::PROPERTY_DLQ_ORIGIN_TOPIC);
648 set.insert(MessageConst::PROPERTY_DLQ_ORIGIN_MESSAGE_ID);
649 set.insert(MessageConst::PROPERTY_CRC32);
650 set
651});