rocketmq_common/utils/
cleanup_policy_utils.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::str::FromStr;
19
20use rocketmq_rust::ArcMut;
21
22use crate::common::attribute::cleanup_policy::CleanupPolicy;
23use crate::common::attribute::Attribute;
24use crate::common::config::TopicConfig;
25use crate::TopicAttributes::TopicAttributes;
26
27pub fn is_compaction(topic_config: Option<&TopicConfig>) -> bool {
28    match topic_config {
29        Some(config) => CleanupPolicy::COMPACTION == get_delete_policy(Some(config)),
30        None => false,
31    }
32}
33
34pub fn get_delete_policy(topic_config: Option<&TopicConfig>) -> CleanupPolicy {
35    match topic_config {
36        Some(config) => {
37            let attribute_name = TopicAttributes::cleanup_policy_attribute().name();
38            match config.attributes.get(attribute_name) {
39                Some(value) => CleanupPolicy::from_str(value.as_str()).unwrap(),
40                None => CleanupPolicy::from_str(
41                    TopicAttributes::cleanup_policy_attribute().default_value(),
42                )
43                .unwrap(),
44            }
45        }
46        None => {
47            CleanupPolicy::from_str(TopicAttributes::cleanup_policy_attribute().default_value())
48                .unwrap()
49        }
50    }
51}
52
53pub fn get_delete_policy_arc_mut(topic_config: Option<&ArcMut<TopicConfig>>) -> CleanupPolicy {
54    match topic_config {
55        Some(config) => {
56            let attribute_name = TopicAttributes::cleanup_policy_attribute().name();
57            match config.attributes.get(attribute_name) {
58                Some(value) => CleanupPolicy::from_str(value.as_str()).unwrap(),
59                None => CleanupPolicy::from_str(
60                    TopicAttributes::cleanup_policy_attribute().default_value(),
61                )
62                .unwrap(),
63            }
64        }
65        None => {
66            CleanupPolicy::from_str(TopicAttributes::cleanup_policy_attribute().default_value())
67                .unwrap()
68        }
69    }
70}
71
72#[cfg(test)]
73mod tests {
74    use super::*;
75    use crate::common::attribute::cleanup_policy::CleanupPolicy;
76    use crate::common::config::TopicConfig;
77
78    #[test]
79    fn is_compaction_returns_true_when_cleanup_policy_is_compaction() {
80        let mut topic_config = TopicConfig::default();
81        topic_config.attributes.insert(
82            TopicAttributes::cleanup_policy_attribute().name().into(),
83            CleanupPolicy::COMPACTION.to_string().into(),
84        );
85        assert!(is_compaction(Some(&topic_config)));
86    }
87
88    #[test]
89    fn is_compaction_returns_false_when_cleanup_policy_is_not_compaction() {
90        let mut topic_config = TopicConfig::default();
91        topic_config.attributes.insert(
92            TopicAttributes::cleanup_policy_attribute()
93                .name()
94                .to_string()
95                .into(),
96            CleanupPolicy::DELETE.to_string().into(),
97        );
98        assert!(!is_compaction(Some(&topic_config)));
99    }
100
101    #[test]
102    fn is_compaction_returns_false_when_topic_config_is_none() {
103        assert!(!is_compaction(None));
104    }
105
106    #[test]
107    fn get_delete_policy_returns_cleanup_policy_from_topic_config() {
108        let mut topic_config = TopicConfig::default();
109        topic_config.attributes.insert(
110            TopicAttributes::cleanup_policy_attribute()
111                .name()
112                .to_string()
113                .into(),
114            CleanupPolicy::DELETE.to_string().into(),
115        );
116        assert_eq!(
117            get_delete_policy(Some(&topic_config)),
118            CleanupPolicy::DELETE
119        );
120    }
121
122    #[test]
123    fn get_delete_policy_returns_default_cleanup_policy_when_not_set_in_topic_config() {
124        let topic_config = TopicConfig::default();
125        assert_eq!(
126            get_delete_policy(Some(&topic_config)),
127            CleanupPolicy::from_str(TopicAttributes::cleanup_policy_attribute().default_value())
128                .unwrap()
129        );
130    }
131
132    #[test]
133    fn get_delete_policy_returns_default_cleanup_policy_when_topic_config_is_none() {
134        assert_eq!(
135            get_delete_policy(None),
136            CleanupPolicy::from_str(TopicAttributes::cleanup_policy_attribute().default_value())
137                .unwrap()
138        );
139    }
140}