Skip to main content

rocketmq_common/common/
config.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Display;
17
18use cheetah_string::CheetahString;
19use serde::Deserialize;
20use serde::Serialize;
21
22use super::TopicFilterType;
23use crate::common::attribute::topic_message_type::TopicMessageType;
24use crate::common::attribute::Attribute;
25use crate::common::constant::PermName;
26use crate::TopicAttributes::TopicAttributes;
27
28#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
29#[serde(rename_all = "camelCase")]
30pub struct TopicConfig {
31    pub topic_name: Option<CheetahString>,
32    pub read_queue_nums: u32,
33    pub write_queue_nums: u32,
34    pub perm: u32,
35    pub topic_filter_type: TopicFilterType,
36    pub topic_sys_flag: u32,
37    pub order: bool,
38    pub attributes: HashMap<CheetahString, CheetahString>,
39}
40
41impl Display for TopicConfig {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        write!(
44            f,
45            "TopicConfig {{ topic_name: {:?}, read_queue_nums: {}, write_queue_nums: {}, perm: {}, topic_filter_type: \
46             {}, topic_sys_flag: {}, order: {}, attributes: {:?} }}",
47            self.topic_name,
48            self.read_queue_nums,
49            self.write_queue_nums,
50            self.perm,
51            self.topic_filter_type,
52            self.topic_sys_flag,
53            self.order,
54            self.attributes
55        )
56    }
57}
58
59impl Default for TopicConfig {
60    fn default() -> Self {
61        Self {
62            topic_name: None,
63            read_queue_nums: Self::DEFAULT_READ_QUEUE_NUMS,
64            write_queue_nums: Self::DEFAULT_WRITE_QUEUE_NUMS,
65            perm: PermName::PERM_READ | PermName::PERM_WRITE,
66            topic_filter_type: TopicFilterType::SingleTag,
67            topic_sys_flag: 0,
68            order: false,
69            attributes: HashMap::new(),
70        }
71    }
72}
73
74impl TopicConfig {
75    const DEFAULT_READ_QUEUE_NUMS: u32 = 16;
76    const DEFAULT_WRITE_QUEUE_NUMS: u32 = 16;
77    const SEPARATOR: &'static str = " ";
78
79    pub fn get_topic_message_type(&self) -> TopicMessageType {
80        if self.attributes.is_empty() {
81            return TopicMessageType::Normal;
82        }
83        let content = self
84            .attributes
85            .get(TopicAttributes::topic_message_type_attribute().name());
86        if let Some(content) = content {
87            return TopicMessageType::from(content.to_string());
88        }
89        TopicMessageType::Normal
90    }
91
92    pub fn new(topic_name: impl Into<CheetahString>) -> Self {
93        TopicConfig {
94            topic_name: Some(topic_name.into()),
95            ..Self::default()
96        }
97    }
98
99    pub fn with_queues(topic_name: impl Into<CheetahString>, read_queue_nums: u32, write_queue_nums: u32) -> Self {
100        Self {
101            read_queue_nums,
102            write_queue_nums,
103            ..Self::new(topic_name)
104        }
105    }
106
107    pub fn with_perm(
108        topic_name: impl Into<CheetahString>,
109        read_queue_nums: u32,
110        write_queue_nums: u32,
111        perm: u32,
112    ) -> Self {
113        Self {
114            read_queue_nums,
115            write_queue_nums,
116            perm,
117            ..Self::new(topic_name)
118        }
119    }
120
121    pub fn with_sys_flag(
122        topic_name: impl Into<CheetahString>,
123        read_queue_nums: u32,
124        write_queue_nums: u32,
125        perm: u32,
126        topic_sys_flag: u32,
127    ) -> Self {
128        Self {
129            read_queue_nums,
130            write_queue_nums,
131            perm,
132            topic_sys_flag,
133            ..Self::new(topic_name)
134        }
135    }
136
137    pub fn encode(&self) -> String {
138        let mut sb = String::new();
139        sb.push_str(self.topic_name.clone().unwrap_or(CheetahString::empty()).as_str());
140        sb.push_str(Self::SEPARATOR);
141        sb.push_str(&self.read_queue_nums.to_string());
142        sb.push_str(Self::SEPARATOR);
143        sb.push_str(&self.write_queue_nums.to_string());
144        sb.push_str(Self::SEPARATOR);
145        sb.push_str(&self.perm.to_string());
146        sb.push_str(Self::SEPARATOR);
147        sb.push_str(&format!("{}", self.topic_filter_type));
148        sb.push_str(Self::SEPARATOR);
149        if !self.attributes.is_empty() {
150            sb.push_str(&serde_json::to_string(&self.attributes).unwrap());
151        }
152        sb.trim().to_string()
153    }
154
155    pub fn decode(&mut self, input: &str) -> bool {
156        let parts: Vec<&str> = input.split(Self::SEPARATOR).collect();
157        if parts.len() >= 5 {
158            self.topic_name = Some(parts[0].into());
159            self.read_queue_nums = parts[1].parse().unwrap_or(Self::DEFAULT_READ_QUEUE_NUMS);
160            self.write_queue_nums = parts[2].parse().unwrap_or(Self::DEFAULT_WRITE_QUEUE_NUMS);
161            self.perm = parts[3].parse().unwrap_or(PermName::PERM_READ | PermName::PERM_WRITE);
162            self.topic_filter_type = From::from(parts[4]);
163            if parts.len() >= 6 {
164                if let Ok(attrs) = serde_json::from_str(parts[5]) {
165                    self.attributes = attrs
166                }
167            }
168            true
169        } else {
170            false
171        }
172    }
173
174    pub fn get_read_queue_nums(&self) -> u32 {
175        self.read_queue_nums
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use std::collections::HashMap;
182
183    use cheetah_string::CheetahString;
184
185    use super::*;
186
187    #[test]
188    fn default_topic_config() {
189        let config = TopicConfig::default();
190        assert_eq!(config.topic_name, None);
191        assert_eq!(config.read_queue_nums, TopicConfig::DEFAULT_READ_QUEUE_NUMS);
192        assert_eq!(config.write_queue_nums, TopicConfig::DEFAULT_WRITE_QUEUE_NUMS);
193        assert_eq!(config.perm, PermName::PERM_READ | PermName::PERM_WRITE);
194        assert_eq!(config.topic_filter_type, TopicFilterType::SingleTag);
195        assert_eq!(config.topic_sys_flag, 0);
196        assert!(!config.order);
197        assert!(config.attributes.is_empty());
198    }
199
200    #[test]
201    fn new_topic_config() {
202        let topic_name = CheetahString::from("test_topic");
203        let config = TopicConfig::new(topic_name.clone());
204        assert_eq!(config.topic_name, Some(topic_name));
205    }
206
207    #[test]
208    fn with_queues_topic_config() {
209        let topic_name = CheetahString::from("test_topic");
210        let config = TopicConfig::with_queues(topic_name.clone(), 8, 8);
211        assert_eq!(config.topic_name, Some(topic_name));
212        assert_eq!(config.read_queue_nums, 8);
213        assert_eq!(config.write_queue_nums, 8);
214    }
215
216    #[test]
217    fn with_perm_topic_config() {
218        let topic_name = CheetahString::from("test_topic");
219        let config = TopicConfig::with_perm(topic_name.clone(), 8, 8, PermName::PERM_READ);
220        assert_eq!(config.topic_name, Some(topic_name));
221        assert_eq!(config.read_queue_nums, 8);
222        assert_eq!(config.write_queue_nums, 8);
223        assert_eq!(config.perm, PermName::PERM_READ);
224    }
225
226    #[test]
227    fn with_sys_flag_topic_config() {
228        let topic_name = CheetahString::from("test_topic");
229        let config = TopicConfig::with_sys_flag(topic_name.clone(), 8, 8, PermName::PERM_READ, 1);
230        assert_eq!(config.topic_name, Some(topic_name));
231        assert_eq!(config.read_queue_nums, 8);
232        assert_eq!(config.write_queue_nums, 8);
233        assert_eq!(config.perm, PermName::PERM_READ);
234        assert_eq!(config.topic_sys_flag, 1);
235    }
236
237    #[test]
238    fn encode_topic_config() {
239        let topic_name = CheetahString::from("test_topic");
240        let mut attributes = HashMap::new();
241        attributes.insert(CheetahString::from("key"), CheetahString::from("value"));
242        let config = TopicConfig {
243            topic_name: Some(topic_name.clone()),
244            read_queue_nums: 8,
245            write_queue_nums: 8,
246            perm: PermName::PERM_READ,
247            topic_filter_type: TopicFilterType::SingleTag,
248            topic_sys_flag: 1,
249            order: false,
250            attributes,
251        };
252        let encoded = config.encode();
253        assert!(encoded.contains("test_topic 8 8 4 SINGLE_TAG {\"key\":\"value\"}"));
254    }
255
256    #[test]
257    fn decode_topic_config() {
258        let mut config = TopicConfig::default();
259        let input = "test_topic 8 8 2 SingleTag {\"key\":\"value\"}";
260        let result = config.decode(input);
261        assert!(result);
262        assert_eq!(config.topic_name, Some(CheetahString::from("test_topic")));
263        assert_eq!(config.read_queue_nums, 8);
264        assert_eq!(config.write_queue_nums, 8);
265        assert_eq!(config.perm, PermName::PERM_WRITE);
266        assert_eq!(config.topic_filter_type, TopicFilterType::SingleTag);
267        assert_eq!(
268            config.attributes.get(&CheetahString::from("key")),
269            Some(&CheetahString::from("value"))
270        );
271    }
272
273    #[test]
274    fn get_topic_message_type_normal() {
275        let config = TopicConfig::default();
276        assert_eq!(config.get_topic_message_type(), TopicMessageType::Normal);
277    }
278
279    #[test]
280    fn get_topic_message_type_from_attributes() {
281        let mut config = TopicConfig::default();
282        config.attributes.insert(
283            CheetahString::from(TopicAttributes::topic_message_type_attribute().name()),
284            CheetahString::from("Normal"),
285        );
286        assert_eq!(config.get_topic_message_type(), TopicMessageType::Normal);
287    }
288}