use crate::proto::hdfs::DatanodeInfoProto;
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Policy {
Disable,
Never,
Default,
Always,
}
#[derive(Debug, Clone)]
pub struct ReplaceDatanodeOnFailure {
policy: Policy,
best_effort: bool,
}
impl ReplaceDatanodeOnFailure {
pub fn new(policy: Policy, best_effort: bool) -> Self {
Self {
policy,
best_effort,
}
}
pub fn is_best_effort(&self) -> bool {
self.best_effort
}
pub fn should_replace(
&self,
replication: u32,
existing_datanodes: &[DatanodeInfoProto],
is_append: bool,
is_hflushed: bool,
) -> bool {
let n = existing_datanodes.len();
if n == 0 || n >= replication as usize {
return false;
}
match self.policy {
Policy::Disable | Policy::Never => false,
Policy::Always => true,
Policy::Default => {
if replication < 3 {
false
} else if n <= (replication as usize / 2) {
true
} else {
is_append || is_hflushed
}
}
}
}
}
impl Default for ReplaceDatanodeOnFailure {
fn default() -> Self {
Self {
policy: Policy::Default,
best_effort: true,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_should_replace_policy_disable() {
let replace = ReplaceDatanodeOnFailure::new(Policy::Disable, true);
assert!(!replace.should_replace(3, &[], false, false));
assert!(!replace.should_replace(3, &[DatanodeInfoProto::default()], false, false));
assert!(!replace.should_replace(
3,
&[DatanodeInfoProto::default(), DatanodeInfoProto::default()],
false,
false
));
}
#[test]
fn test_should_replace_policy_never() {
let replace = ReplaceDatanodeOnFailure::new(Policy::Never, true);
assert!(!replace.should_replace(3, &[], false, false));
assert!(!replace.should_replace(3, &[DatanodeInfoProto::default()], false, false));
assert!(!replace.should_replace(
3,
&[DatanodeInfoProto::default(), DatanodeInfoProto::default()],
false,
false
));
}
#[test]
fn test_should_replace_policy_always() {
let replace = ReplaceDatanodeOnFailure::new(Policy::Always, true);
assert!(!replace.should_replace(3, &[], false, false));
assert!(replace.should_replace(3, &[DatanodeInfoProto::default()], false, false));
assert!(replace.should_replace(
3,
&[DatanodeInfoProto::default(), DatanodeInfoProto::default()],
false,
false
));
assert!(!replace.should_replace(
3,
&[
DatanodeInfoProto::default(),
DatanodeInfoProto::default(),
DatanodeInfoProto::default()
],
false,
false
));
}
#[test]
fn test_should_replace_policy_default() {
let replace = ReplaceDatanodeOnFailure::new(Policy::Default, true);
assert!(!replace.should_replace(2, &[DatanodeInfoProto::default()], false, false));
assert!(replace.should_replace(3, &[DatanodeInfoProto::default()], false, false));
assert!(!replace.should_replace(
3,
&[DatanodeInfoProto::default(), DatanodeInfoProto::default()],
false,
false
));
assert!(replace.should_replace(
3,
&[DatanodeInfoProto::default(), DatanodeInfoProto::default()],
true,
false
));
assert!(replace.should_replace(
3,
&[DatanodeInfoProto::default(), DatanodeInfoProto::default()],
false,
true
));
}
}