rocketmq_common/common/
config.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::collections::HashMap;
19use std::fmt::Display;
20
21use cheetah_string::CheetahString;
22use serde::Deserialize;
23use serde::Serialize;
24
25use super::TopicFilterType;
26use crate::common::attribute::topic_message_type::TopicMessageType;
27use crate::common::attribute::Attribute;
28use crate::common::constant::PermName;
29use crate::TopicAttributes::TopicAttributes;
30
31#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
32#[serde(rename_all = "camelCase")]
33pub struct TopicConfig {
34    pub topic_name: Option<CheetahString>,
35    pub read_queue_nums: u32,
36    pub write_queue_nums: u32,
37    pub perm: u32,
38    pub topic_filter_type: TopicFilterType,
39    pub topic_sys_flag: u32,
40    pub order: bool,
41    pub attributes: HashMap<CheetahString, CheetahString>,
42}
43
44impl Display for TopicConfig {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        write!(
47            f,
48            "TopicConfig {{ topic_name: {:?}, read_queue_nums: {}, write_queue_nums: {}, perm: \
49             {}, topic_filter_type: {}, topic_sys_flag: {}, order: {}, attributes: {:?} }}",
50            self.topic_name,
51            self.read_queue_nums,
52            self.write_queue_nums,
53            self.perm,
54            self.topic_filter_type,
55            self.topic_sys_flag,
56            self.order,
57            self.attributes
58        )
59    }
60}
61
62impl Default for TopicConfig {
63    fn default() -> Self {
64        Self {
65            topic_name: None,
66            read_queue_nums: Self::DEFAULT_READ_QUEUE_NUMS,
67            write_queue_nums: Self::DEFAULT_WRITE_QUEUE_NUMS,
68            perm: PermName::PERM_READ | PermName::PERM_WRITE,
69            topic_filter_type: TopicFilterType::SingleTag,
70            topic_sys_flag: 0,
71            order: false,
72            attributes: HashMap::new(),
73        }
74    }
75}
76
77impl TopicConfig {
78    const DEFAULT_READ_QUEUE_NUMS: u32 = 16;
79    const DEFAULT_WRITE_QUEUE_NUMS: u32 = 16;
80    const SEPARATOR: &'static str = " ";
81
82    pub fn get_topic_message_type(&self) -> TopicMessageType {
83        if self.attributes.is_empty() {
84            return TopicMessageType::Normal;
85        }
86        let content = self
87            .attributes
88            .get(TopicAttributes::topic_message_type_attribute().name());
89        if let Some(content) = content {
90            return TopicMessageType::from(content.to_string());
91        }
92        TopicMessageType::Normal
93    }
94
95    pub fn new(topic_name: impl Into<CheetahString>) -> Self {
96        TopicConfig {
97            topic_name: Some(topic_name.into()),
98            ..Self::default()
99        }
100    }
101
102    pub fn with_queues(
103        topic_name: impl Into<CheetahString>,
104        read_queue_nums: u32,
105        write_queue_nums: u32,
106    ) -> Self {
107        Self {
108            read_queue_nums,
109            write_queue_nums,
110            ..Self::new(topic_name)
111        }
112    }
113
114    pub fn with_perm(
115        topic_name: impl Into<CheetahString>,
116        read_queue_nums: u32,
117        write_queue_nums: u32,
118        perm: u32,
119    ) -> Self {
120        Self {
121            read_queue_nums,
122            write_queue_nums,
123            perm,
124            ..Self::new(topic_name)
125        }
126    }
127
128    pub fn with_sys_flag(
129        topic_name: impl Into<CheetahString>,
130        read_queue_nums: u32,
131        write_queue_nums: u32,
132        perm: u32,
133        topic_sys_flag: u32,
134    ) -> Self {
135        Self {
136            read_queue_nums,
137            write_queue_nums,
138            perm,
139            topic_sys_flag,
140            ..Self::new(topic_name)
141        }
142    }
143
144    pub fn encode(&self) -> String {
145        let mut sb = String::new();
146        sb.push_str(
147            self.topic_name
148                .clone()
149                .unwrap_or(CheetahString::empty())
150                .as_str(),
151        );
152        sb.push_str(Self::SEPARATOR);
153        sb.push_str(&self.read_queue_nums.to_string());
154        sb.push_str(Self::SEPARATOR);
155        sb.push_str(&self.write_queue_nums.to_string());
156        sb.push_str(Self::SEPARATOR);
157        sb.push_str(&self.perm.to_string());
158        sb.push_str(Self::SEPARATOR);
159        sb.push_str(&format!("{}", self.topic_filter_type));
160        sb.push_str(Self::SEPARATOR);
161        if !self.attributes.is_empty() {
162            sb.push_str(&serde_json::to_string(&self.attributes).unwrap());
163        }
164        sb.trim().to_string()
165    }
166
167    pub fn decode(&mut self, input: &str) -> bool {
168        let parts: Vec<&str> = input.split(Self::SEPARATOR).collect();
169        if parts.len() >= 5 {
170            self.topic_name = Some(parts[0].into());
171            self.read_queue_nums = parts[1].parse().unwrap_or(Self::DEFAULT_READ_QUEUE_NUMS);
172            self.write_queue_nums = parts[2].parse().unwrap_or(Self::DEFAULT_WRITE_QUEUE_NUMS);
173            self.perm = parts[3]
174                .parse()
175                .unwrap_or(PermName::PERM_READ | PermName::PERM_WRITE);
176            self.topic_filter_type = From::from(parts[4]);
177            if parts.len() >= 6 {
178                if let Ok(attrs) = serde_json::from_str(parts[5]) {
179                    self.attributes = attrs
180                }
181            }
182            true
183        } else {
184            false
185        }
186    }
187
188    pub fn get_read_queue_nums(&self) -> u32 {
189        self.read_queue_nums
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use std::collections::HashMap;
196
197    use cheetah_string::CheetahString;
198
199    use super::*;
200
201    #[test]
202    fn default_topic_config() {
203        let config = TopicConfig::default();
204        assert_eq!(config.topic_name, None);
205        assert_eq!(config.read_queue_nums, TopicConfig::DEFAULT_READ_QUEUE_NUMS);
206        assert_eq!(
207            config.write_queue_nums,
208            TopicConfig::DEFAULT_WRITE_QUEUE_NUMS
209        );
210        assert_eq!(config.perm, PermName::PERM_READ | PermName::PERM_WRITE);
211        assert_eq!(config.topic_filter_type, TopicFilterType::SingleTag);
212        assert_eq!(config.topic_sys_flag, 0);
213        assert!(!config.order);
214        assert!(config.attributes.is_empty());
215    }
216
217    #[test]
218    fn new_topic_config() {
219        let topic_name = CheetahString::from("test_topic");
220        let config = TopicConfig::new(topic_name.clone());
221        assert_eq!(config.topic_name, Some(topic_name));
222    }
223
224    #[test]
225    fn with_queues_topic_config() {
226        let topic_name = CheetahString::from("test_topic");
227        let config = TopicConfig::with_queues(topic_name.clone(), 8, 8);
228        assert_eq!(config.topic_name, Some(topic_name));
229        assert_eq!(config.read_queue_nums, 8);
230        assert_eq!(config.write_queue_nums, 8);
231    }
232
233    #[test]
234    fn with_perm_topic_config() {
235        let topic_name = CheetahString::from("test_topic");
236        let config = TopicConfig::with_perm(topic_name.clone(), 8, 8, PermName::PERM_READ);
237        assert_eq!(config.topic_name, Some(topic_name));
238        assert_eq!(config.read_queue_nums, 8);
239        assert_eq!(config.write_queue_nums, 8);
240        assert_eq!(config.perm, PermName::PERM_READ);
241    }
242
243    #[test]
244    fn with_sys_flag_topic_config() {
245        let topic_name = CheetahString::from("test_topic");
246        let config = TopicConfig::with_sys_flag(topic_name.clone(), 8, 8, PermName::PERM_READ, 1);
247        assert_eq!(config.topic_name, Some(topic_name));
248        assert_eq!(config.read_queue_nums, 8);
249        assert_eq!(config.write_queue_nums, 8);
250        assert_eq!(config.perm, PermName::PERM_READ);
251        assert_eq!(config.topic_sys_flag, 1);
252    }
253
254    #[test]
255    fn encode_topic_config() {
256        let topic_name = CheetahString::from("test_topic");
257        let mut attributes = HashMap::new();
258        attributes.insert(CheetahString::from("key"), CheetahString::from("value"));
259        let config = TopicConfig {
260            topic_name: Some(topic_name.clone()),
261            read_queue_nums: 8,
262            write_queue_nums: 8,
263            perm: PermName::PERM_READ,
264            topic_filter_type: TopicFilterType::SingleTag,
265            topic_sys_flag: 1,
266            order: false,
267            attributes,
268        };
269        let encoded = config.encode();
270        assert!(encoded.contains("test_topic 8 8 4 SINGLE_TAG {\"key\":\"value\"}"));
271    }
272
273    #[test]
274    fn decode_topic_config() {
275        let mut config = TopicConfig::default();
276        let input = "test_topic 8 8 2 SingleTag {\"key\":\"value\"}";
277        let result = config.decode(input);
278        assert!(result);
279        assert_eq!(config.topic_name, Some(CheetahString::from("test_topic")));
280        assert_eq!(config.read_queue_nums, 8);
281        assert_eq!(config.write_queue_nums, 8);
282        assert_eq!(config.perm, PermName::PERM_WRITE);
283        assert_eq!(config.topic_filter_type, TopicFilterType::SingleTag);
284        assert_eq!(
285            config.attributes.get(&CheetahString::from("key")),
286            Some(&CheetahString::from("value"))
287        );
288    }
289
290    #[test]
291    fn get_topic_message_type_normal() {
292        let config = TopicConfig::default();
293        assert_eq!(config.get_topic_message_type(), TopicMessageType::Normal);
294    }
295
296    #[test]
297    fn get_topic_message_type_from_attributes() {
298        let mut config = TopicConfig::default();
299        config.attributes.insert(
300            CheetahString::from(TopicAttributes::topic_message_type_attribute().name()),
301            CheetahString::from("Normal"),
302        );
303        assert_eq!(config.get_topic_message_type(), TopicMessageType::Normal);
304    }
305}