use std::str::FromStr;
use crate::{
common::{attribute::cleanup_policy::CleanupPolicy, config::TopicConfig},
TopicAttributes,
};
pub fn is_compaction(topic_config: &Option<TopicConfig>) -> bool {
match topic_config {
Some(config) => CleanupPolicy::COMPACTION == get_delete_policy(Some(config)),
None => false,
}
}
pub fn get_delete_policy(topic_config: Option<&TopicConfig>) -> CleanupPolicy {
match topic_config {
Some(config) => {
let attribute_name = TopicAttributes::CLEANUP_POLICY_ATTRIBUTE.get_name();
match config.attributes.get(attribute_name) {
Some(value) => CleanupPolicy::from_str(value.as_str()).unwrap(),
None => CleanupPolicy::from_str(
TopicAttributes::CLEANUP_POLICY_ATTRIBUTE.get_default_value(),
)
.unwrap(),
}
}
None => {
CleanupPolicy::from_str(TopicAttributes::CLEANUP_POLICY_ATTRIBUTE.get_default_value())
.unwrap()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::common::{attribute::cleanup_policy::CleanupPolicy, config::TopicConfig};
#[test]
fn is_compaction_returns_true_when_cleanup_policy_is_compaction() {
let mut topic_config = TopicConfig::default();
topic_config.attributes.insert(
TopicAttributes::CLEANUP_POLICY_ATTRIBUTE
.get_name()
.to_string(),
CleanupPolicy::COMPACTION.to_string(),
);
assert_eq!(is_compaction(&Some(topic_config)), true);
}
#[test]
fn is_compaction_returns_false_when_cleanup_policy_is_not_compaction() {
let mut topic_config = TopicConfig::default();
topic_config.attributes.insert(
TopicAttributes::CLEANUP_POLICY_ATTRIBUTE
.get_name()
.to_string(),
CleanupPolicy::DELETE.to_string(),
);
assert_eq!(is_compaction(&Some(topic_config)), false);
}
#[test]
fn is_compaction_returns_false_when_topic_config_is_none() {
assert_eq!(is_compaction(&None), false);
}
#[test]
fn get_delete_policy_returns_cleanup_policy_from_topic_config() {
let mut topic_config = TopicConfig::default();
topic_config.attributes.insert(
TopicAttributes::CLEANUP_POLICY_ATTRIBUTE
.get_name()
.to_string(),
CleanupPolicy::DELETE.to_string(),
);
assert_eq!(
get_delete_policy(Some(&topic_config)),
CleanupPolicy::DELETE
);
}
#[test]
fn get_delete_policy_returns_default_cleanup_policy_when_not_set_in_topic_config() {
let topic_config = TopicConfig::default();
assert_eq!(
get_delete_policy(Some(&topic_config)),
CleanupPolicy::from_str(TopicAttributes::CLEANUP_POLICY_ATTRIBUTE.get_default_value())
.unwrap()
);
}
#[test]
fn get_delete_policy_returns_default_cleanup_policy_when_topic_config_is_none() {
assert_eq!(
get_delete_policy(None),
CleanupPolicy::from_str(TopicAttributes::CLEANUP_POLICY_ATTRIBUTE.get_default_value())
.unwrap()
);
}
}