rocketmq_common/common/attribute/
topic_attributes.rs

1use std::collections::HashMap;
2use std::collections::HashSet;
3use std::sync::Arc;
4use std::sync::OnceLock;
5
6use cheetah_string::CheetahString;
7
8use crate::common::attribute::enum_attribute::EnumAttribute;
9use crate::common::attribute::long_range_attribute::LongRangeAttribute;
10use crate::common::attribute::topic_message_type::TopicMessageType;
11use crate::common::attribute::Attribute;
12use crate::hashset;
13
14/// Defines attributes and configurations for RocketMQ topics
15pub struct TopicAttributes;
16
17impl TopicAttributes {
18    /// Queue type attribute defining storage structure (BatchCQ or SimpleCQ)
19    pub fn queue_type_attribute() -> &'static EnumAttribute {
20        static INSTANCE: OnceLock<EnumAttribute> = OnceLock::new();
21        INSTANCE.get_or_init(|| {
22            let mut valid_values = HashSet::new();
23            valid_values.insert("BatchCQ".into());
24            valid_values.insert("SimpleCQ".into());
25
26            EnumAttribute::new("queue.type".into(), false, valid_values, "SimpleCQ".into())
27        })
28    }
29
30    /// Cleanup policy attribute defining how messages are cleaned up (DELETE or COMPACTION)
31    pub fn cleanup_policy_attribute() -> &'static EnumAttribute {
32        static INSTANCE: OnceLock<EnumAttribute> = OnceLock::new();
33        INSTANCE.get_or_init(|| {
34            let mut valid_values = HashSet::new();
35            valid_values.insert("DELETE".into());
36            valid_values.insert("COMPACTION".into());
37
38            EnumAttribute::new(
39                "cleanup.policy".into(),
40                false,
41                valid_values,
42                "DELETE".into(),
43            )
44        })
45    }
46
47    /// Message type attribute defining the type of messages stored in the topic
48    pub fn topic_message_type_attribute() -> &'static EnumAttribute {
49        static INSTANCE: OnceLock<EnumAttribute> = OnceLock::new();
50        INSTANCE.get_or_init(|| {
51            EnumAttribute::new(
52                "message.type".into(),
53                true,
54                TopicMessageType::topic_message_type_set()
55                    .into_iter()
56                    .map(|item| item.into())
57                    .collect(),
58                TopicMessageType::Normal.to_string().into(),
59            )
60        })
61    }
62
63    /// Reserve time attribute defining how long messages are kept
64    pub fn topic_reserve_time_attribute() -> &'static LongRangeAttribute {
65        static INSTANCE: OnceLock<LongRangeAttribute> = OnceLock::new();
66        INSTANCE
67            .get_or_init(|| LongRangeAttribute::new("reserve.time".into(), true, -1, i64::MAX, -1))
68    }
69
70    /// Returns all defined attributes in a HashMap
71    pub fn all() -> &'static HashMap<CheetahString, Arc<dyn Attribute>> {
72        static ALL: OnceLock<HashMap<CheetahString, Arc<dyn Attribute>>> = OnceLock::new();
73        ALL.get_or_init(|| {
74            let mut map = HashMap::new();
75
76            let queue_type = Self::queue_type_attribute();
77            let cleanup_policy = Self::cleanup_policy_attribute();
78            let message_type = Self::topic_message_type_attribute();
79            let reserve_time = Self::topic_reserve_time_attribute();
80
81            map.insert(
82                queue_type.name().clone(),
83                Arc::new(queue_type.clone()) as Arc<dyn Attribute>,
84            );
85            map.insert(
86                cleanup_policy.name().clone(),
87                Arc::new(cleanup_policy.clone()) as Arc<dyn Attribute>,
88            );
89            map.insert(
90                message_type.name().clone(),
91                Arc::new(message_type.clone()) as Arc<dyn Attribute>,
92            );
93            map.insert(
94                reserve_time.name().clone(),
95                Arc::new(reserve_time.clone()) as Arc<dyn Attribute>,
96            );
97
98            map
99        })
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use std::sync::Arc;
106
107    use super::*;
108
109    #[test]
110    fn queue_type_attribute_default_value() {
111        let attribute = TopicAttributes::queue_type_attribute();
112        assert_eq!(attribute.default_value(), "SimpleCQ");
113    }
114
115    #[test]
116    fn queue_type_attribute_valid_values() {
117        let attribute = TopicAttributes::queue_type_attribute();
118        let valid_values: HashSet<CheetahString> = ["BatchCQ", "SimpleCQ"]
119            .iter()
120            .cloned()
121            .map(CheetahString::from)
122            .collect();
123        assert_eq!(attribute.universe(), &valid_values);
124    }
125
126    #[test]
127    fn cleanup_policy_attribute_default_value() {
128        let attribute = TopicAttributes::cleanup_policy_attribute();
129        assert_eq!(attribute.default_value(), "DELETE");
130    }
131
132    #[test]
133    fn cleanup_policy_attribute_valid_values() {
134        let attribute = TopicAttributes::cleanup_policy_attribute();
135        let valid_values: HashSet<CheetahString> = ["DELETE", "COMPACTION"]
136            .iter()
137            .cloned()
138            .map(CheetahString::from)
139            .collect();
140        assert_eq!(attribute.universe(), &valid_values);
141    }
142
143    #[test]
144    fn topic_message_type_attribute_default_value() {
145        let attribute = TopicAttributes::topic_message_type_attribute();
146        assert_eq!(attribute.default_value(), "NORMAL");
147    }
148
149    #[test]
150    fn topic_message_type_attribute_valid_values() {
151        let attribute = TopicAttributes::topic_message_type_attribute();
152        let valid_values: HashSet<CheetahString> = TopicMessageType::topic_message_type_set()
153            .into_iter()
154            .map(CheetahString::from)
155            .collect();
156        assert_eq!(attribute.universe(), &valid_values);
157    }
158
159    #[test]
160    fn topic_reserve_time_attribute_default_value() {
161        let attribute = TopicAttributes::topic_reserve_time_attribute();
162        assert_eq!(attribute.default_value(), -1);
163    }
164
165    #[test]
166    fn topic_reserve_time_attribute_valid_range() {
167        let attribute = TopicAttributes::topic_reserve_time_attribute();
168        assert_eq!(attribute.min(), -1);
169        assert_eq!(attribute.max(), i64::MAX);
170    }
171
172    #[test]
173    fn all_attributes_contains_all_defined_attributes() {
174        let all_attributes = TopicAttributes::all();
175        assert!(all_attributes.contains_key("queue.type"));
176        assert!(all_attributes.contains_key("cleanup.policy"));
177        assert!(all_attributes.contains_key("message.type"));
178        assert!(all_attributes.contains_key("reserve.time"));
179    }
180}