rocketmq_common/common/attribute/
attribute_util.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18use std::collections::HashSet;
19use std::string::ToString;
20use std::sync::Arc;
21
22use cheetah_string::CheetahString;
23use tracing::info;
24
25use crate::common::attribute::Attribute;
26
27#[derive(Debug, thiserror::Error)]
28pub enum AttributeError {
29    #[error("Only add attribute is supported while creating topic. Key: {0}")]
30    CreateOnlySupportsAdd(String),
31
32    #[error("Attempt to delete a nonexistent key: {0}")]
33    DeleteNonexistentKey(String),
34
35    #[error("Wrong format key: {0}")]
36    WrongFormatKey(String),
37
38    #[error("Alter duplication key. Key: {0}")]
39    DuplicateKey(String),
40
41    #[error("KV string format wrong")]
42    KvStringFormatWrong,
43
44    #[error("Unsupported key: {0}")]
45    UnsupportedKey(String),
46
47    #[error("Attempt to update an unchangeable attribute. Key: {0}")]
48    UnchangeableAttribute(String),
49
50    #[error("Attribute verification failed: {0}")]
51    AttributeVerificationFailed(String),
52}
53
54/// Utility for working with topic attributes
55pub struct AttributeUtil;
56
57impl AttributeUtil {
58    /// Alter the current attributes based on new attribute requests
59    ///
60    /// # Arguments
61    ///
62    /// * `create` - If true, we're creating new attributes, otherwise updating existing ones
63    /// * `all` - Map of all supported attributes
64    /// * `current_attributes` - Current attribute values
65    /// * `new_attributes` - New attribute operations (prefixed with + or -)
66    ///
67    /// # Returns
68    ///
69    /// A Result containing the final attribute map or an error
70    #[allow(clippy::map_entry)]
71    pub fn alter_current_attributes(
72        create: bool,
73        all: &HashMap<CheetahString, Arc<dyn Attribute>>,
74        current_attributes: &HashMap<CheetahString, CheetahString>,
75        new_attributes: &HashMap<CheetahString, CheetahString>,
76    ) -> Result<HashMap<CheetahString, CheetahString>, AttributeError> {
77        let mut init = HashMap::new();
78        let mut add = HashMap::new();
79        let mut update = HashMap::new();
80        let mut delete = HashMap::new();
81        let mut keys = HashSet::new();
82
83        // Process new attribute operations
84        for (key, value) in new_attributes {
85            let real_key = Self::real_key(key)?;
86
87            Self::validate(&real_key)?;
88            Self::duplication_check(&mut keys, &real_key)?;
89
90            if create {
91                if key.starts_with('+') {
92                    init.insert(real_key, value.clone());
93                } else {
94                    return Err(AttributeError::CreateOnlySupportsAdd(real_key.to_string()));
95                }
96            } else if key.starts_with('+') {
97                if !current_attributes.contains_key(&real_key) {
98                    add.insert(real_key, value.clone());
99                } else {
100                    update.insert(real_key, value.clone());
101                }
102            } else if key.starts_with('-') {
103                if !current_attributes.contains_key(&real_key) {
104                    return Err(AttributeError::DeleteNonexistentKey(real_key.to_string()));
105                }
106                delete.insert(real_key, value.clone());
107            } else {
108                return Err(AttributeError::WrongFormatKey(real_key.to_string()));
109            }
110        }
111
112        // Validate all operations
113        Self::validate_alter(all, &init, true, false)?;
114        Self::validate_alter(all, &add, false, false)?;
115        Self::validate_alter(all, &update, false, false)?;
116        Self::validate_alter(all, &delete, false, true)?;
117
118        info!("add: {:?}, update: {:?}, delete: {:?}", add, update, delete);
119
120        // Create final attribute map
121        let mut final_attributes = current_attributes.clone();
122
123        // Apply changes
124        for (k, v) in init {
125            final_attributes.insert(k, v);
126        }
127
128        for (k, v) in add {
129            final_attributes.insert(k, v);
130        }
131
132        for (k, v) in update {
133            final_attributes.insert(k, v);
134        }
135
136        for k in delete.keys() {
137            final_attributes.remove(k);
138        }
139
140        Ok(final_attributes)
141    }
142
143    /// Check for key duplication in the operation set
144    fn duplication_check(keys: &mut HashSet<String>, key: &str) -> Result<(), AttributeError> {
145        if !keys.insert(key.to_string()) {
146            return Err(AttributeError::DuplicateKey(key.to_string()));
147        }
148        Ok(())
149    }
150
151    /// Validate attribute key format
152    fn validate(kv_attribute: &str) -> Result<(), AttributeError> {
153        if kv_attribute.is_empty() {
154            return Err(AttributeError::KvStringFormatWrong);
155        }
156
157        if kv_attribute.contains('+') {
158            return Err(AttributeError::KvStringFormatWrong);
159        }
160
161        if kv_attribute.contains('-') {
162            return Err(AttributeError::KvStringFormatWrong);
163        }
164
165        Ok(())
166    }
167
168    /// Validate attribute operations
169    fn validate_alter(
170        all: &HashMap<CheetahString, Arc<dyn Attribute>>,
171        alter: &HashMap<CheetahString, CheetahString>,
172        init: bool,
173        delete: bool,
174    ) -> Result<(), AttributeError> {
175        for (key, value) in alter {
176            let attribute = match all.get(key) {
177                Some(attr) => attr,
178                None => return Err(AttributeError::UnsupportedKey(key.to_string())),
179            };
180
181            if !init && !attribute.is_changeable() {
182                return Err(AttributeError::UnchangeableAttribute(key.to_string()));
183            }
184
185            if !delete {
186                attribute.verify(value).map_err(|e| {
187                    AttributeError::AttributeVerificationFailed(format!("Key: {key}, Error: {e}"))
188                })?;
189            }
190        }
191
192        Ok(())
193    }
194
195    /// Extract the real key by removing the prefix (+ or -)
196    fn real_key(key: &str) -> Result<CheetahString, AttributeError> {
197        if key.len() < 2 {
198            return Err(AttributeError::KvStringFormatWrong);
199        }
200
201        let prefix = key.chars().next().unwrap();
202        if prefix != '+' && prefix != '-' {
203            return Err(AttributeError::WrongFormatKey(key.to_string()));
204        }
205        Ok(key[1..].to_string().into())
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use std::sync::Arc;
212
213    use super::*;
214
215    #[test]
216    fn alter_current_attributes_create_only_supports_add() {
217        let all = HashMap::new();
218        let current_attributes = HashMap::new();
219        let mut new_attributes = HashMap::new();
220        new_attributes.insert("-key1".into(), "".into());
221
222        let result = AttributeUtil::alter_current_attributes(
223            true,
224            &all,
225            &current_attributes,
226            &new_attributes,
227        );
228        assert!(result.is_err());
229        assert_eq!(
230            result.unwrap_err().to_string(),
231            "Only add attribute is supported while creating topic. Key: key1"
232        );
233    }
234
235    #[test]
236    fn alter_current_attributes_delete_nonexistent_key() {
237        let all = HashMap::new();
238        let current_attributes = HashMap::new();
239        let mut new_attributes = HashMap::new();
240        new_attributes.insert("-key1".into(), "".into());
241
242        let result = AttributeUtil::alter_current_attributes(
243            false,
244            &all,
245            &current_attributes,
246            &new_attributes,
247        );
248        assert!(result.is_err());
249        assert_eq!(
250            result.unwrap_err().to_string(),
251            "Attempt to delete a nonexistent key: key1"
252        );
253    }
254
255    #[test]
256    fn alter_current_attributes_wrong_format_key() {
257        let all = HashMap::new();
258        let current_attributes = HashMap::new();
259        let mut new_attributes = HashMap::new();
260        new_attributes.insert("key1".into(), "value1".into());
261
262        let result = AttributeUtil::alter_current_attributes(
263            false,
264            &all,
265            &current_attributes,
266            &new_attributes,
267        );
268        assert!(result.is_err());
269        assert_eq!(result.unwrap_err().to_string(), "Wrong format key: key1");
270    }
271}