#[derive(Debug, Clone)]
pub(crate) struct ReplicaView {
pub node_id: i32,
pub rack: Option<String>,
pub in_isr: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ReplicaSelectorKind {
#[default]
Leader,
RackAware,
}
impl ReplicaSelectorKind {
pub fn from_config_str(s: &str) -> Result<Self, String> {
match s.trim() {
"leader" => Ok(Self::Leader),
"rack-aware" => Ok(Self::RackAware),
other => Err(other.to_string()),
}
}
pub(crate) fn select(
self,
client_rack: Option<&str>,
leader_id: i32,
replicas: &[ReplicaView],
) -> i32 {
match self {
Self::Leader => -1,
Self::RackAware => {
let Some(rack) = client_rack.filter(|r| !r.is_empty()) else {
return -1;
};
let winner = replicas
.iter()
.filter(|r| r.in_isr && r.rack.as_deref() == Some(rack))
.min_by_key(|r| r.node_id);
match winner {
Some(r) if r.node_id != leader_id => r.node_id,
_ => -1,
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
fn view(node_id: i32, rack: &str, in_isr: bool) -> ReplicaView {
ReplicaView {
node_id,
rack: Some(rack.to_string()),
in_isr,
}
}
#[test]
fn parse_known_values() {
assert!(ReplicaSelectorKind::from_config_str("leader") == Ok(ReplicaSelectorKind::Leader));
assert!(
ReplicaSelectorKind::from_config_str("rack-aware")
== Ok(ReplicaSelectorKind::RackAware)
);
assert!(ReplicaSelectorKind::from_config_str("bogus").is_err());
}
#[test]
fn leader_kind_always_returns_minus_one() {
let replicas = [view(1, "a", true), view(2, "b", true)];
assert!(ReplicaSelectorKind::Leader.select(Some("b"), 1, &replicas) == -1);
}
#[test]
fn rack_aware_picks_same_rack_isr_member() {
let replicas = [view(1, "a", true), view(2, "b", true), view(3, "b", true)];
assert!(ReplicaSelectorKind::RackAware.select(Some("b"), 1, &replicas) == 2);
}
#[test]
fn rack_aware_none_when_client_rack_missing() {
let replicas = [view(1, "a", true), view(2, "b", true)];
assert!(ReplicaSelectorKind::RackAware.select(None, 1, &replicas) == -1);
assert!(ReplicaSelectorKind::RackAware.select(Some(""), 1, &replicas) == -1);
}
#[test]
fn rack_aware_none_when_no_same_rack_replica() {
let replicas = [view(1, "a", true), view(2, "a", true)];
assert!(ReplicaSelectorKind::RackAware.select(Some("z"), 1, &replicas) == -1);
}
#[test]
fn rack_aware_ignores_non_isr_same_rack_replica() {
let replicas = [view(1, "a", true), view(2, "b", false)];
assert!(ReplicaSelectorKind::RackAware.select(Some("b"), 1, &replicas) == -1);
}
#[test]
fn rack_aware_none_when_only_same_rack_replica_is_leader() {
let replicas = [view(1, "b", true), view(2, "a", true)];
assert!(ReplicaSelectorKind::RackAware.select(Some("b"), 1, &replicas) == -1);
}
}