use std::collections::BTreeMap;
use std::fmt;
use kf_protocol::derive::{Decode, Encode};
use k8_metadata::topic::TopicStatus as K8TopicStatus;
use k8_metadata::topic::TopicStatusResolution as K8TopicStatusResolution;
use types::{ReplicaMap, SpuId};
#[derive(Decode, Encode, Debug, Clone, PartialEq)]
pub struct TopicStatus {
pub resolution: TopicResolution,
pub replica_map: BTreeMap<i32, Vec<i32>>,
pub reason: String,
}
impl fmt::Display for TopicStatus {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f,"{:#?}",self.resolution)
}
}
#[derive(Decode, Encode, Debug, Clone, PartialEq)]
pub enum TopicResolution {
Init, Pending, InsufficientResources, InvalidConfig, Provisioned, }
impl TopicResolution {
pub fn resolution_label(&self) -> &'static str {
match self {
TopicResolution::Provisioned => "provisioned",
TopicResolution::Init => "initializing",
TopicResolution::Pending => "pending",
TopicResolution::InsufficientResources => "insufficient-resources",
TopicResolution::InvalidConfig => "invalid-config",
}
}
pub fn is_invalid(&self) -> bool {
match self {
Self::InvalidConfig => true,
_ => false
}
}
pub fn no_resource(&self) -> bool {
match self {
Self::InsufficientResources => true,
_ => false
}
}
}
impl std::fmt::Display for TopicResolution {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "resolution::{}", self.resolution_label())
}
}
impl From<K8TopicStatus> for TopicStatus {
fn from(kv_status: K8TopicStatus) -> Self {
let resolution = match kv_status.resolution {
K8TopicStatusResolution::Provisioned => TopicResolution::Provisioned,
K8TopicStatusResolution::Init => TopicResolution::Init,
K8TopicStatusResolution::Pending => TopicResolution::Pending,
K8TopicStatusResolution::InsufficientResources => TopicResolution::InsufficientResources,
K8TopicStatusResolution::InvalidConfig => TopicResolution::InvalidConfig,
};
TopicStatus {
resolution,
replica_map: kv_status.replica_map.clone(),
reason: kv_status.reason.clone(),
}
}
}
impl From<TopicStatus> for K8TopicStatus {
fn from(status: TopicStatus) -> K8TopicStatus {
let resolution = match status.resolution {
TopicResolution::Provisioned => K8TopicStatusResolution::Provisioned,
TopicResolution::Init => K8TopicStatusResolution::Init,
TopicResolution::Pending => K8TopicStatusResolution::Pending,
TopicResolution::InsufficientResources => K8TopicStatusResolution::InsufficientResources,
TopicResolution::InvalidConfig => K8TopicStatusResolution::InvalidConfig,
};
K8TopicStatus {
resolution: resolution,
replica_map: status.replica_map.clone(),
reason: status.reason.clone(),
}
}
}
impl ::std::default::Default for TopicStatus {
fn default() -> Self {
TopicStatus {
resolution: TopicResolution::Init,
replica_map: BTreeMap::new(),
reason: "".to_owned(),
}
}
}
impl ::std::default::Default for TopicResolution {
fn default() -> Self {
TopicResolution::Init
}
}
fn create_replica_map(rows: Vec<Vec<i32>>) -> BTreeMap<i32, Vec<i32>> {
let mut map = BTreeMap::new();
for (idx, row) in rows.iter().enumerate() {
map.insert(idx as i32, row.clone());
}
map
}
impl TopicStatus {
pub fn new<S>(
resolution: TopicResolution,
replica_map: Vec<Vec<i32>>,
reason: S,
) -> Self where S: Into<String> {
TopicStatus {
resolution: resolution,
replica_map: create_replica_map(replica_map),
reason: reason.into()
}
}
pub fn resolution(&self) -> &TopicResolution {
&self.resolution
}
pub fn replica_map_cnt(&self) -> i32 {
self.replica_map.len() as i32
}
pub fn set_replica_map(&mut self, replica_map: ReplicaMap) {
self.replica_map = replica_map;
}
pub fn spus_in_replica(&self) -> Vec<SpuId> {
let mut spu_list: Vec<SpuId> = vec![];
for (_, replicas) in self.replica_map.iter() {
for spu in replicas {
if !spu_list.contains(spu) {
spu_list.push(*spu);
}
}
}
spu_list
}
pub fn replica_map_str(&self) -> String {
format!("{:?}", self.replica_map)
}
pub fn replica_map_cnt_str(&self) -> String {
let map_rows = self.replica_map_cnt();
if map_rows > 0 {
format!("{}", map_rows)
} else {
"-".to_owned()
}
}
pub fn reason_str(&self) -> &String {
&self.reason
}
pub fn is_resolution_initializing(&self) -> bool {
self.resolution == TopicResolution::Init
}
pub fn need_replica_map_recal(&self) -> bool {
self.resolution == TopicResolution::Pending ||
self.resolution == TopicResolution::InsufficientResources
}
pub fn is_resolution_pending(&self) -> bool {
self.resolution == TopicResolution::Pending
}
pub fn is_resolution_transient(&self) -> bool {
self.resolution == TopicResolution::Init ||
self.resolution == TopicResolution::Pending
}
pub fn is_resolution_provisioned(&self) -> bool {
self.resolution == TopicResolution::Provisioned
}
pub fn next_resolution_provisoned() -> (TopicResolution,String){
(TopicResolution::Provisioned,"".to_owned())
}
pub fn next_resolution_pending() -> (TopicResolution,String) {
(TopicResolution::Pending,super::PENDING_REASON.to_owned())
}
pub fn next_resolution_invalid_config<S>(reason: S) -> (TopicResolution,String) where S: Into<String> {
(TopicResolution::InvalidConfig,reason.into())
}
pub fn set_resolution_no_resource<S>(reason: S) -> (TopicResolution,String) where S: Into<String> {
(TopicResolution::InsufficientResources, reason.into())
}
pub fn set_next_resolution(&mut self,next: (TopicResolution,String)) {
let (resolution,reason) = next;
self.resolution = resolution;
self.reason = reason;
}
}