use std::collections::HashSet;
use std::fmt;
use std::slice::Iter;
use kf_protocol::derive::{Decode, Encode};
use kf_protocol::api::Offset;
use types::SpuId;
use k8_metadata::partition::PartitionStatus as K8PartitionStatus;
use k8_metadata::partition::ReplicaStatus as K8ReplicaStatus;
use k8_metadata::partition::PartitionResolution as K8PartitionResolution;
use super::ElectionPolicy;
use super::ElectionScoring;
#[derive(Decode, Encode, Default, Debug, Clone, PartialEq)]
pub struct PartitionStatus {
pub resolution: PartitionResolution,
pub leader: ReplicaStatus,
lsr: u32,
replicas: Vec<ReplicaStatus>
}
impl fmt::Display for PartitionStatus {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f,"{:#?} Leader: {} [",self.resolution,self.leader)?;
for replica in &self.replicas {
write!(f,"{},",replica)?;
}
write!(f,"]")
}
}
impl PartitionStatus {
pub fn leader<L>(leader: L) -> Self where L: Into<ReplicaStatus>{
Self::new(leader.into(),vec![])
}
pub fn new<L>(leader: L,replicas: Vec<ReplicaStatus>) -> Self where L: Into<ReplicaStatus> {
Self {
resolution: PartitionResolution::default(),
leader: leader.into(),
replicas,
..Default::default()
}
}
pub fn new2<L>(leader: L,replicas: Vec<ReplicaStatus>,resolution: PartitionResolution) -> Self where L: Into<ReplicaStatus> {
Self {
resolution,
leader: leader.into(),
replicas,
..Default::default()
}
}
pub fn is_online(&self) -> bool {
self.resolution == PartitionResolution::Online
}
pub fn is_offline(&self) -> bool {
self.resolution != PartitionResolution::Online
}
pub fn lsr(&self) -> u32 {
self.lsr
}
pub fn replica_iter(&self) -> Iter<ReplicaStatus> {
self.replicas.iter()
}
pub fn live_replicas(&self) -> Vec<i32> {
self.replicas
.iter()
.map(|lrs| lrs.spu)
.collect()
}
pub fn offline_replicas(&self) -> Vec<i32> {
vec![]
}
pub fn has_live_replicas(&self) -> bool {
self.replicas.len() > 0
}
pub fn candidate_leader<P>(&self,online: &HashSet<SpuId>,policy: &P) -> Option<SpuId>
where P: ElectionPolicy
{
let mut candiate_spu = None;
let mut best_score = 0;
for candidate in &self.replicas {
if online.contains(&candidate.spu) {
match policy.potential_leader_score(&candidate,&self.leader) {
ElectionScoring::Score(score) => {
if candiate_spu.is_some() {
if score < best_score {
best_score = score;
candiate_spu = Some(candidate.spu);
}
} else {
best_score = score;
candiate_spu = Some(candidate.spu);
}
},
_ => {}
}
}
}
candiate_spu
}
pub fn merge(&mut self, other: Self) {
self.resolution = other.resolution;
if let Some(old) = self.leader.merge(&other.leader) {
self.replicas.push(old); }
for status in other.replicas {
if let Some(old_status) = find_status(&mut self.replicas,status.spu) {
old_status.merge(&status);
} else {
self.replicas.push(status);
}
}
let spu = self.leader.spu;
self.replicas.drain_filter(move |s| s.spu == spu);
self.update_lrs();
}
fn update_lrs(&mut self) {
let leader_leo = self.leader.leo;
self.lsr = self.replicas.iter().filter(|re| re.leo != -1 && leader_leo == re.leo).count() as u32;
}
}
fn find_status(status: &mut Vec<ReplicaStatus>, spu: SpuId) -> Option<&'_ mut ReplicaStatus>{
status.iter_mut().find(|status| status.spu == spu)
}
impl From<K8PartitionStatus> for PartitionStatus {
fn from(kv_status: K8PartitionStatus) -> Self {
Self {
resolution: kv_status.resolution.into(),
leader: kv_status.leader.into(),
replicas: kv_status.replicas.into_iter().map(|lrs| lrs.into()).collect(),
lsr: kv_status.lsr
}
}
}
impl From<PartitionStatus> for K8PartitionStatus {
fn from(status: PartitionStatus) -> K8PartitionStatus {
K8PartitionStatus {
resolution: status.resolution.into(),
leader: status.leader.into(),
replicas: status.replicas.into_iter().map(|lrs| lrs.into()).collect(),
lsr: status.lsr.into()
}
}
}
#[derive(Decode, Encode, Debug, Clone, PartialEq)]
pub enum PartitionResolution {
Offline, Online, LeaderOffline, ElectionLeaderFound }
impl Default for PartitionResolution {
fn default() -> Self {
PartitionResolution::Offline
}
}
impl From<K8PartitionResolution> for PartitionResolution {
fn from(resolution: K8PartitionResolution) -> Self {
match resolution {
K8PartitionResolution::Offline => Self::Offline,
K8PartitionResolution::Online => Self::Online,
K8PartitionResolution::ElectionLeaderFound => Self::ElectionLeaderFound,
K8PartitionResolution::LeaderOffline => Self::LeaderOffline
}
}
}
impl From<PartitionResolution> for K8PartitionResolution {
fn from(resolution: PartitionResolution) -> Self {
match resolution {
PartitionResolution::Offline => Self::Offline,
PartitionResolution::Online => Self::Online,
PartitionResolution::LeaderOffline => Self::LeaderOffline,
PartitionResolution::ElectionLeaderFound => Self::ElectionLeaderFound
}
}
}
#[derive(Decode, Encode, Debug, Clone, PartialEq)]
pub struct ReplicaStatus {
pub spu: i32,
pub hw: i64,
pub leo: i64
}
impl fmt::Display for ReplicaStatus {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f,"spu:{} hw:{} leo: {}",self.spu,self.hw,self.leo)
}
}
impl Default for ReplicaStatus {
fn default() -> Self {
ReplicaStatus {
spu: -1,
hw: -1,
leo: -1
}
}
}
impl ReplicaStatus {
pub fn new(spu: SpuId,hw: Offset,leo: Offset) -> Self {
Self {
spu,
hw,
leo
}
}
pub fn leader_lag(&self,leader_status: &Self) -> i64 {
leader_status.leo - self.leo
}
pub fn high_watermark_lag(&self,leader_status: &Self) -> i64 {
leader_status.hw - self.hw
}
pub fn merge(&mut self,source: &Self) -> Option<Self> {
if source.spu == -1 {
return None;
}
if self.spu == -1 || self.spu == source.spu {
self.spu = source.spu;
if source.leo != -1 {
self.leo = source.leo;
}
if source.hw != -1 {
self.hw = source.hw;
}
None
} else {
let old = Self::new(self.spu,self.hw,self.leo);
self.spu = source.spu;
self.leo = source.leo;
self.hw = source.hw;
Some(old)
}
}
}
impl From<(SpuId,Offset,Offset)> for ReplicaStatus {
fn from(val: (SpuId,Offset,Offset)) -> Self {
let (id,high_watermark,end_offset) = val;
Self::new(id, high_watermark, end_offset)
}
}
impl From<K8ReplicaStatus> for ReplicaStatus {
fn from(status: K8ReplicaStatus) -> Self {
Self {
spu: status.spu,
hw: status.hw,
leo: status.leo
}
}
}
impl From<ReplicaStatus> for K8ReplicaStatus {
fn from(status: ReplicaStatus) -> Self {
Self {
spu: status.spu,
hw: status.hw,
leo: status.leo
}
}
}
#[cfg(test)]
mod test {
use std::collections::HashSet;
use super::PartitionStatus;
use super::ReplicaStatus;
use super::ElectionPolicy;
use super::ElectionScoring;
struct SimplePolicy {}
impl ElectionPolicy for SimplePolicy {
fn potential_leader_score(&self,replica_status: &ReplicaStatus,leader: &ReplicaStatus) -> ElectionScoring {
let lag = leader.leo - replica_status.leo;
if lag < 4 {
ElectionScoring::Score(lag as u16)
} else {
ElectionScoring::NotSuitable
}
}
}
#[test]
fn test_candidate_spu_no_candidate() {
let status = PartitionStatus::leader((5000,0,0));
let online_spu = HashSet::new();
let policy = SimplePolicy{};
assert!(status.candidate_leader(&online_spu,&policy).is_none());
}
#[test]
fn test_candidate_spu_best() {
let status = PartitionStatus::new(
(5000,100,110),
vec![
(5001,100,110).into(), (5002,100,105).into(), ]);
let mut online_spu = HashSet::new();
online_spu.insert(5001);
online_spu.insert(5002);
let policy = SimplePolicy{};
assert_eq!(status.candidate_leader(&online_spu,&policy),Some(5001)); }
#[test]
fn test_candidate_spu_best_conflict() {
let status = PartitionStatus::new(
(5000,100,110),
vec![
(5001,95,110).into(), (5002,100,105).into(),
]);
let mut online_spu = HashSet::new();
online_spu.insert(5000);
online_spu.insert(5001);
online_spu.insert(5002);
let policy = SimplePolicy{};
assert_eq!(status.candidate_leader(&online_spu,&policy),Some(5001)); }
#[test]
fn test_candidate_spu_no_online() {
let status = PartitionStatus::new(
(5000,100,110),
vec![
(5001,95,110).into(), (5002,100,105).into(),
]);
let online_spu = HashSet::new();
let policy = SimplePolicy{};
assert!(status.candidate_leader(&online_spu,&policy).is_none());
}
#[test]
fn test_merge_initial() {
let mut target = PartitionStatus::default();
let source = PartitionStatus::leader((5000,10,11));
target.merge(source);
assert_eq!(target.leader,(5000,10,11).into());
assert_eq!(target.replicas.len(),0);
let source = PartitionStatus::new(
(5000,10,11),
vec![
(5001,9,11).into()
]);
target.merge(source);
assert_eq!(target.replicas.len(),1);
assert_eq!(target.replicas[0],(5001,9,11).into());
}
#[test]
fn test_merge_lrs_full() {
let mut target = PartitionStatus::new(
(5000,100,110),
vec![
(5001,95,110).into(),
(5002,100,105).into(),
]);
let source = PartitionStatus::new(
(5000,120,120),
vec![
(5002,110,120).into(),
(5001,-1,-1).into()
]);
target.merge(source);
assert_eq!(target.leader,(5000,120,120).into());
assert_eq!(target.replicas[0],(5001,95,110).into());
assert_eq!(target.replicas[1],(5002,110,120).into());
}
#[test]
fn test_merge_lrs_different_leader() {
let mut target = PartitionStatus::new(
(5000,100,110),
vec![
(5001,95,110).into(),
]);
let source = PartitionStatus::new(
(5001,120,120),
vec![
(5000,-1,-1).into()
]);
target.merge(source);
assert_eq!(target.leader,(5001,120,120).into());
assert_eq!(target.replicas.len(),1);
assert_eq!(target.replicas[0],(5000,100,110).into());
}
#[test]
fn test_merge_lrs_case_2() {
let mut target = PartitionStatus::new(
(5002,0,0),
vec![
(5002,0,0).into(),
(5001,0,0).into(),
]);
let source = PartitionStatus::new(
(5002,0,0),
vec![
(5001,-1,-1).into(),
]);
target.merge(source);
assert_eq!(target.leader,(5002,0,0).into());
assert_eq!(target.replicas.len(),1);
assert_eq!(target.replicas[0],(5001,0,0).into());
}
}