rocketmq_common/utils/
cleanup_policy_utils.rs1use std::str::FromStr;
19
20use rocketmq_rust::ArcMut;
21
22use crate::common::attribute::cleanup_policy::CleanupPolicy;
23use crate::common::attribute::Attribute;
24use crate::common::config::TopicConfig;
25use crate::TopicAttributes::TopicAttributes;
26
27pub fn is_compaction(topic_config: Option<&TopicConfig>) -> bool {
28 match topic_config {
29 Some(config) => CleanupPolicy::COMPACTION == get_delete_policy(Some(config)),
30 None => false,
31 }
32}
33
34pub fn get_delete_policy(topic_config: Option<&TopicConfig>) -> CleanupPolicy {
35 match topic_config {
36 Some(config) => {
37 let attribute_name = TopicAttributes::cleanup_policy_attribute().name();
38 match config.attributes.get(attribute_name) {
39 Some(value) => CleanupPolicy::from_str(value.as_str()).unwrap(),
40 None => CleanupPolicy::from_str(
41 TopicAttributes::cleanup_policy_attribute().default_value(),
42 )
43 .unwrap(),
44 }
45 }
46 None => {
47 CleanupPolicy::from_str(TopicAttributes::cleanup_policy_attribute().default_value())
48 .unwrap()
49 }
50 }
51}
52
53pub fn get_delete_policy_arc_mut(topic_config: Option<&ArcMut<TopicConfig>>) -> CleanupPolicy {
54 match topic_config {
55 Some(config) => {
56 let attribute_name = TopicAttributes::cleanup_policy_attribute().name();
57 match config.attributes.get(attribute_name) {
58 Some(value) => CleanupPolicy::from_str(value.as_str()).unwrap(),
59 None => CleanupPolicy::from_str(
60 TopicAttributes::cleanup_policy_attribute().default_value(),
61 )
62 .unwrap(),
63 }
64 }
65 None => {
66 CleanupPolicy::from_str(TopicAttributes::cleanup_policy_attribute().default_value())
67 .unwrap()
68 }
69 }
70}
71
72#[cfg(test)]
73mod tests {
74 use super::*;
75 use crate::common::attribute::cleanup_policy::CleanupPolicy;
76 use crate::common::config::TopicConfig;
77
78 #[test]
79 fn is_compaction_returns_true_when_cleanup_policy_is_compaction() {
80 let mut topic_config = TopicConfig::default();
81 topic_config.attributes.insert(
82 TopicAttributes::cleanup_policy_attribute().name().into(),
83 CleanupPolicy::COMPACTION.to_string().into(),
84 );
85 assert!(is_compaction(Some(&topic_config)));
86 }
87
88 #[test]
89 fn is_compaction_returns_false_when_cleanup_policy_is_not_compaction() {
90 let mut topic_config = TopicConfig::default();
91 topic_config.attributes.insert(
92 TopicAttributes::cleanup_policy_attribute()
93 .name()
94 .to_string()
95 .into(),
96 CleanupPolicy::DELETE.to_string().into(),
97 );
98 assert!(!is_compaction(Some(&topic_config)));
99 }
100
101 #[test]
102 fn is_compaction_returns_false_when_topic_config_is_none() {
103 assert!(!is_compaction(None));
104 }
105
106 #[test]
107 fn get_delete_policy_returns_cleanup_policy_from_topic_config() {
108 let mut topic_config = TopicConfig::default();
109 topic_config.attributes.insert(
110 TopicAttributes::cleanup_policy_attribute()
111 .name()
112 .to_string()
113 .into(),
114 CleanupPolicy::DELETE.to_string().into(),
115 );
116 assert_eq!(
117 get_delete_policy(Some(&topic_config)),
118 CleanupPolicy::DELETE
119 );
120 }
121
122 #[test]
123 fn get_delete_policy_returns_default_cleanup_policy_when_not_set_in_topic_config() {
124 let topic_config = TopicConfig::default();
125 assert_eq!(
126 get_delete_policy(Some(&topic_config)),
127 CleanupPolicy::from_str(TopicAttributes::cleanup_policy_attribute().default_value())
128 .unwrap()
129 );
130 }
131
132 #[test]
133 fn get_delete_policy_returns_default_cleanup_policy_when_topic_config_is_none() {
134 assert_eq!(
135 get_delete_policy(None),
136 CleanupPolicy::from_str(TopicAttributes::cleanup_policy_attribute().default_value())
137 .unwrap()
138 );
139 }
140}