rocketmq_common/common/attribute/
topic_attributes.rs1use std::collections::HashMap;
2use std::collections::HashSet;
3use std::sync::Arc;
4use std::sync::OnceLock;
5
6use cheetah_string::CheetahString;
7
8use crate::common::attribute::enum_attribute::EnumAttribute;
9use crate::common::attribute::long_range_attribute::LongRangeAttribute;
10use crate::common::attribute::topic_message_type::TopicMessageType;
11use crate::common::attribute::Attribute;
12use crate::hashset;
13
14pub struct TopicAttributes;
16
17impl TopicAttributes {
18 pub fn queue_type_attribute() -> &'static EnumAttribute {
20 static INSTANCE: OnceLock<EnumAttribute> = OnceLock::new();
21 INSTANCE.get_or_init(|| {
22 let mut valid_values = HashSet::new();
23 valid_values.insert("BatchCQ".into());
24 valid_values.insert("SimpleCQ".into());
25
26 EnumAttribute::new("queue.type".into(), false, valid_values, "SimpleCQ".into())
27 })
28 }
29
30 pub fn cleanup_policy_attribute() -> &'static EnumAttribute {
32 static INSTANCE: OnceLock<EnumAttribute> = OnceLock::new();
33 INSTANCE.get_or_init(|| {
34 let mut valid_values = HashSet::new();
35 valid_values.insert("DELETE".into());
36 valid_values.insert("COMPACTION".into());
37
38 EnumAttribute::new(
39 "cleanup.policy".into(),
40 false,
41 valid_values,
42 "DELETE".into(),
43 )
44 })
45 }
46
47 pub fn topic_message_type_attribute() -> &'static EnumAttribute {
49 static INSTANCE: OnceLock<EnumAttribute> = OnceLock::new();
50 INSTANCE.get_or_init(|| {
51 EnumAttribute::new(
52 "message.type".into(),
53 true,
54 TopicMessageType::topic_message_type_set()
55 .into_iter()
56 .map(|item| item.into())
57 .collect(),
58 TopicMessageType::Normal.to_string().into(),
59 )
60 })
61 }
62
63 pub fn topic_reserve_time_attribute() -> &'static LongRangeAttribute {
65 static INSTANCE: OnceLock<LongRangeAttribute> = OnceLock::new();
66 INSTANCE
67 .get_or_init(|| LongRangeAttribute::new("reserve.time".into(), true, -1, i64::MAX, -1))
68 }
69
70 pub fn all() -> &'static HashMap<CheetahString, Arc<dyn Attribute>> {
72 static ALL: OnceLock<HashMap<CheetahString, Arc<dyn Attribute>>> = OnceLock::new();
73 ALL.get_or_init(|| {
74 let mut map = HashMap::new();
75
76 let queue_type = Self::queue_type_attribute();
77 let cleanup_policy = Self::cleanup_policy_attribute();
78 let message_type = Self::topic_message_type_attribute();
79 let reserve_time = Self::topic_reserve_time_attribute();
80
81 map.insert(
82 queue_type.name().clone(),
83 Arc::new(queue_type.clone()) as Arc<dyn Attribute>,
84 );
85 map.insert(
86 cleanup_policy.name().clone(),
87 Arc::new(cleanup_policy.clone()) as Arc<dyn Attribute>,
88 );
89 map.insert(
90 message_type.name().clone(),
91 Arc::new(message_type.clone()) as Arc<dyn Attribute>,
92 );
93 map.insert(
94 reserve_time.name().clone(),
95 Arc::new(reserve_time.clone()) as Arc<dyn Attribute>,
96 );
97
98 map
99 })
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use std::sync::Arc;
106
107 use super::*;
108
109 #[test]
110 fn queue_type_attribute_default_value() {
111 let attribute = TopicAttributes::queue_type_attribute();
112 assert_eq!(attribute.default_value(), "SimpleCQ");
113 }
114
115 #[test]
116 fn queue_type_attribute_valid_values() {
117 let attribute = TopicAttributes::queue_type_attribute();
118 let valid_values: HashSet<CheetahString> = ["BatchCQ", "SimpleCQ"]
119 .iter()
120 .cloned()
121 .map(CheetahString::from)
122 .collect();
123 assert_eq!(attribute.universe(), &valid_values);
124 }
125
126 #[test]
127 fn cleanup_policy_attribute_default_value() {
128 let attribute = TopicAttributes::cleanup_policy_attribute();
129 assert_eq!(attribute.default_value(), "DELETE");
130 }
131
132 #[test]
133 fn cleanup_policy_attribute_valid_values() {
134 let attribute = TopicAttributes::cleanup_policy_attribute();
135 let valid_values: HashSet<CheetahString> = ["DELETE", "COMPACTION"]
136 .iter()
137 .cloned()
138 .map(CheetahString::from)
139 .collect();
140 assert_eq!(attribute.universe(), &valid_values);
141 }
142
143 #[test]
144 fn topic_message_type_attribute_default_value() {
145 let attribute = TopicAttributes::topic_message_type_attribute();
146 assert_eq!(attribute.default_value(), "NORMAL");
147 }
148
149 #[test]
150 fn topic_message_type_attribute_valid_values() {
151 let attribute = TopicAttributes::topic_message_type_attribute();
152 let valid_values: HashSet<CheetahString> = TopicMessageType::topic_message_type_set()
153 .into_iter()
154 .map(CheetahString::from)
155 .collect();
156 assert_eq!(attribute.universe(), &valid_values);
157 }
158
159 #[test]
160 fn topic_reserve_time_attribute_default_value() {
161 let attribute = TopicAttributes::topic_reserve_time_attribute();
162 assert_eq!(attribute.default_value(), -1);
163 }
164
165 #[test]
166 fn topic_reserve_time_attribute_valid_range() {
167 let attribute = TopicAttributes::topic_reserve_time_attribute();
168 assert_eq!(attribute.min(), -1);
169 assert_eq!(attribute.max(), i64::MAX);
170 }
171
172 #[test]
173 fn all_attributes_contains_all_defined_attributes() {
174 let all_attributes = TopicAttributes::all();
175 assert!(all_attributes.contains_key("queue.type"));
176 assert!(all_attributes.contains_key("cleanup.policy"));
177 assert!(all_attributes.contains_key("message.type"));
178 assert!(all_attributes.contains_key("reserve.time"));
179 }
180}