rocketmq_common/common/
key_builder.rs1use crate::common::mix_all::RETRY_GROUP_TOPIC_PREFIX;
18use crate::common::pop_ack_constants::PopAckConstants;
19
20pub const POP_ORDER_REVIVE_QUEUE: i32 = 999;
21pub const POP_RETRY_SEPARATOR_V1: char = '_';
22pub const POP_RETRY_SEPARATOR_V2: char = '+';
23pub const POP_RETRY_REGEX_SEPARATOR_V2: &str = "\\+";
24
25pub struct KeyBuilder;
26
27impl KeyBuilder {
28 pub fn build_pop_retry_topic(topic: &str, cid: &str, enable_retry_v2: bool) -> String {
29 if enable_retry_v2 {
30 return KeyBuilder::build_pop_retry_topic_v2(topic, cid);
31 }
32 KeyBuilder::build_pop_retry_topic_v1(topic, cid)
33 }
34
35 pub fn build_pop_retry_topic_default(topic: &str, cid: &str) -> String {
36 format!("{RETRY_GROUP_TOPIC_PREFIX}{cid}{POP_RETRY_SEPARATOR_V1}{topic}")
37 }
38
39 pub fn build_pop_retry_topic_v2(topic: &str, cid: &str) -> String {
40 format!("{RETRY_GROUP_TOPIC_PREFIX}{cid}{POP_RETRY_SEPARATOR_V2}{topic}")
41 }
42
43 pub fn build_pop_retry_topic_v1(topic: &str, cid: &str) -> String {
44 format!("{RETRY_GROUP_TOPIC_PREFIX}{cid}{POP_RETRY_SEPARATOR_V1}{topic}")
45 }
46
47 pub fn parse_normal_topic(topic: &str, cid: &str) -> String {
48 if topic.starts_with(RETRY_GROUP_TOPIC_PREFIX) {
49 if topic.starts_with(&format!(
50 "{RETRY_GROUP_TOPIC_PREFIX}{cid}{POP_RETRY_SEPARATOR_V2}"
51 )) {
52 return topic[RETRY_GROUP_TOPIC_PREFIX.len() + cid.len() + 1..].to_string();
53 }
54 return topic[RETRY_GROUP_TOPIC_PREFIX.len() + cid.len() + 1..].to_string();
55 }
56 topic.to_string()
57 }
58
59 pub fn parse_normal_topic_default(retry_topic: &str) -> String {
60 if KeyBuilder::is_pop_retry_topic_v2(retry_topic) {
61 let result: Vec<&str> = retry_topic.split(POP_RETRY_REGEX_SEPARATOR_V2).collect();
62 if result.len() == 2 {
63 return result[1].to_string();
64 }
65 }
66 retry_topic.to_string()
67 }
68
69 pub fn parse_group(retry_topic: &str) -> String {
70 if KeyBuilder::is_pop_retry_topic_v2(retry_topic) {
71 let result: Vec<&str> = retry_topic.split(POP_RETRY_REGEX_SEPARATOR_V2).collect();
72 if result.len() == 2 {
73 return result[0][RETRY_GROUP_TOPIC_PREFIX.len()..].to_string();
74 }
75 }
76 retry_topic[RETRY_GROUP_TOPIC_PREFIX.len()..].to_string()
77 }
78
79 pub fn build_polling_key(topic: &str, cid: &str, queue_id: i32) -> String {
80 format!(
81 "{}{}{}{}{}",
82 topic,
83 PopAckConstants::SPLIT,
84 cid,
85 PopAckConstants::SPLIT,
86 queue_id
87 )
88 }
89
90 pub fn is_pop_retry_topic_v2(retry_topic: &str) -> bool {
91 retry_topic.starts_with(RETRY_GROUP_TOPIC_PREFIX)
92 && retry_topic.contains(POP_RETRY_SEPARATOR_V2)
93 }
94}