use std::io::{Error, ErrorKind};
use std::collections::BTreeMap;
use log::trace;
use types::{ReplicaMap, SpuId};
use types::{PartitionId, PartitionCount, ReplicationFactor, IgnoreRackAssignment};
use kf_protocol::Version;
use kf_protocol::bytes::{Buf, BufMut};
use kf_protocol::derive::{Decode, Encode};
use kf_protocol::{Decoder, Encoder};
use k8_metadata::topic::TopicSpec as K8TopicSpec;
use k8_metadata::topic::Partition as K8Partition;
#[derive(Debug,Clone,Default,PartialEq,Encode,Decode)]
pub struct TopicReplicaParam {
pub partitions: PartitionCount,
pub replication_factor: ReplicationFactor,
pub ignore_rack_assignment: IgnoreRackAssignment
}
impl TopicReplicaParam {
pub fn new(partitions: PartitionCount,replication_factor: ReplicationFactor,
ignore_rack_assignment: IgnoreRackAssignment) -> Self {
Self {
partitions,
replication_factor,
ignore_rack_assignment
}
}
}
impl From<(PartitionCount,ReplicationFactor,IgnoreRackAssignment)> for TopicReplicaParam {
fn from(value: (PartitionCount,ReplicationFactor,IgnoreRackAssignment)) -> Self {
let (partitions,replication_factor,ignore_rack) = value;
Self::new(partitions,replication_factor,ignore_rack)
}
}
impl std::fmt::Display for TopicReplicaParam {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "replica param::(p:{}, r:{})", self.partitions, self.replication_factor)
}
}
#[derive(Debug,Default,Clone,PartialEq,Encode,Decode)]
pub struct PartitionMaps {
maps: Vec<PartitionMap>
}
impl From<Vec<PartitionMap>> for PartitionMaps {
fn from(maps: Vec<PartitionMap>) -> Self {
Self {
maps
}
}
}
impl From<Vec<(i32, Vec<i32>)>> for PartitionMaps {
fn from(partition_vec: Vec<(i32, Vec<i32>)>) -> Self {
let maps: Vec<PartitionMap> = partition_vec.into_iter()
.map( |(id,replicas)| PartitionMap {
id,
replicas
}).
collect();
maps.into()
}
}
impl std::fmt::Display for PartitionMaps {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "partiton map:{})", self.maps.len())
}
}
impl PartitionMaps {
pub fn maps(&self) -> &Vec<PartitionMap> {
&self.maps
}
pub fn maps_owned(self) -> Vec<PartitionMap> {
self.maps
}
fn partition_count(&self) -> Option<PartitionCount> {
let partitions = self.maps.len() as PartitionCount;
if partitions > 0 {
Some(partitions)
} else {
None
}
}
fn replication_factor(&self) -> Option<ReplicationFactor> {
if self.maps.len() > 0 {
Some(self.maps[0].replicas.len() as i32)
} else {
None
}
}
fn partition_map_string(&self) -> Option<String> {
let mut res = String::new();
for partition in &self.maps {
res.push_str(&format!("{}:{:?}, ", partition.id, partition.replicas));
}
if res.len() > 0 {
res.truncate(res.len() - 2);
}
Some(res)
}
pub fn unique_spus_in_partition_map(&self) -> Vec<SpuId> {
let mut spu_ids: Vec<SpuId> = vec![];
for partition in &self.maps {
for spu in &partition.replicas {
if !spu_ids.contains(spu) {
spu_ids.push(spu.clone());
}
}
}
spu_ids
}
pub fn partition_map_to_replica_map(&self) -> ReplicaMap {
let mut replica_map: ReplicaMap = BTreeMap::new();
for partition in &self.maps {
replica_map.insert(partition.id, partition.replicas.clone());
}
replica_map
}
pub fn valid_partition_map(&self) -> Result<(), Error> {
if self.maps.len() == 0 {
return Err(Error::new(
ErrorKind::InvalidInput,
"no assigned partitions found",
));
}
let mut id = 0;
let mut replica_len = 0;
for partition in &self.maps {
if id == 0 {
if partition.id != id {
return Err(Error::new(
ErrorKind::InvalidInput,
"assigned partitions must start with id 0",
));
}
replica_len = partition.replicas.len();
if replica_len == 0 {
return Err(Error::new(
ErrorKind::InvalidInput,
"assigned replicas must have at least one spu id",
));
}
} else {
if partition.id != id {
return Err(Error::new(
ErrorKind::InvalidInput,
"assigned partition ids must be in sequence and without gaps",
));
}
if partition.replicas.len() != replica_len {
return Err(Error::new(
ErrorKind::InvalidInput,
format!(
"all assigned replicas must have the same number of spu ids: {}",
replica_len
),
));
}
}
let mut sorted_replicas = partition.replicas.clone();
sorted_replicas.sort();
let unique_count = 1 + sorted_replicas
.windows(2)
.filter(|pair| pair[0] != pair[1])
.count();
if partition.replicas.len() != unique_count {
return Err(Error::new(
ErrorKind::InvalidInput,
format!(
"duplicate spu ids found in assigned partition with id: {}",
id
),
));
}
for spu_id in &partition.replicas {
if *spu_id < 0 {
return Err(Error::new(
ErrorKind::InvalidInput,
format!(
"invalid spu id: {} in assigned partition with id: {}",
spu_id, id
),
));
}
}
id += 1;
}
Ok(())
}
}
#[derive(Debug,Clone,PartialEq)]
pub enum TopicSpec {
Assigned(PartitionMaps),
Computed(TopicReplicaParam)
}
impl Default for TopicSpec {
fn default() -> TopicSpec {
TopicSpec::Assigned(PartitionMaps::default())
}
}
impl TopicSpec {
pub fn new_assigned<J>(partition_map: J) -> Self where J: Into<PartitionMaps> {
TopicSpec::Assigned(partition_map.into())
}
pub fn new_computed(
partitions: PartitionCount,
replication: ReplicationFactor,
ignore_rack: Option<IgnoreRackAssignment>,
) -> Self {
TopicSpec::Computed((partitions, replication, ignore_rack.unwrap_or(false)).into())
}
pub fn is_computed(&self) -> bool {
match self {
TopicSpec::Computed(_) => true,
TopicSpec::Assigned(_) => false,
}
}
pub fn partitions(&self) -> Option<PartitionCount> {
match self {
TopicSpec::Computed(param) => Some(param.partitions),
TopicSpec::Assigned(partition_map) => partition_map.partition_count()
}
}
pub fn replication_factor(&self) -> Option<ReplicationFactor> {
match self {
TopicSpec::Computed(param) => Some(param.replication_factor),
TopicSpec::Assigned(partition_map) => partition_map.replication_factor()
}
}
pub fn ignore_rack_assignment(&self) -> IgnoreRackAssignment {
match self {
TopicSpec::Computed(param) => param.ignore_rack_assignment,
TopicSpec::Assigned(_) => false,
}
}
pub fn type_label(is_computed: &bool) -> &'static str {
match is_computed {
true => "computed",
false => "assigned",
}
}
pub fn partitions_str(partition_cnt: &Option<PartitionCount>) -> String {
match partition_cnt {
Some(partitions) => partitions.to_string(),
None => "-".to_string(),
}
}
pub fn replication_factor_str(replication_cnt: &Option<ReplicationFactor>) -> String {
match replication_cnt {
Some(replication) => replication.to_string(),
None => "-".to_string(),
}
}
pub fn ignore_rack_assign_str(ignore_rack_assign: &bool) -> &'static str {
match ignore_rack_assign {
true => "yes",
false => "-",
}
}
pub fn partition_map_str(&self) -> Option<String> {
match self {
TopicSpec::Computed(_) => None,
TopicSpec::Assigned(partition_map) => partition_map.partition_map_string()
}
}
pub fn valid_partition(partitions: &PartitionCount) -> Result<(), Error> {
if *partitions < 0 {
return Err(Error::new(
ErrorKind::InvalidInput,
"partition is mandatory for computed topics",
));
}
if *partitions == 0 {
return Err(Error::new(
ErrorKind::InvalidInput,
"partition must be greater than 0",
));
}
Ok(())
}
pub fn valid_replication_factor(replication: &ReplicationFactor) -> Result<(), Error> {
if *replication < 0 {
return Err(Error::new(
ErrorKind::InvalidInput,
"replication factor is mandatory for computed topics",
));
}
if *replication == 0 {
return Err(Error::new(
ErrorKind::InvalidInput,
"replication factor must be greater than 0",
));
}
Ok(())
}
}
impl Decoder for TopicSpec {
fn decode<T>(&mut self, src: &mut T,version: Version) -> Result<(), Error>
where
T: Buf,
{
let mut typ: u8 = 0;
typ.decode(src,version)?;
trace!("decoded type: {}", typ);
match typ {
0 => {
let mut partition_map = PartitionMaps::default();
partition_map.decode(src,version)?;
*self = Self::Assigned(partition_map);
Ok(())
}
1 => {
let mut param = TopicReplicaParam::default();
param.decode(src,version)?;
*self =TopicSpec::Computed(param);
Ok(())
}
_ => Err(Error::new(
ErrorKind::UnexpectedEof,
format!("unknown replica type {}", typ),
)),
}
}
}
impl Encoder for TopicSpec {
fn write_size(&self, version: Version) -> usize {
let typ_size = (0 as u8).write_size(version);
match self {
TopicSpec::Assigned(partitions) => typ_size + partitions.write_size(version),
TopicSpec::Computed(param) => {
typ_size + param.write_size(version)
}
}
}
fn encode<T>(&self, dest: &mut T,version: Version) -> Result<(), Error>
where
T: BufMut,
{
if dest.remaining_mut() < self.write_size(version) {
return Err(Error::new(
ErrorKind::UnexpectedEof,
format!(
"not enough capacity for replica len of {}",
self.write_size(version)
),
));
}
match self {
TopicSpec::Assigned(partitions) => {
let typ: u8 = 0;
typ.encode(dest,version)?;
partitions.encode(dest,version)?;
}
TopicSpec::Computed(param) => {
let typ: u8 = 1;
typ.encode(dest,version)?;
param.encode(dest,version)?;
}
}
Ok(())
}
}
impl From<TopicSpec> for K8TopicSpec {
fn from(spec: TopicSpec) -> Self {
match spec {
TopicSpec::Computed(computed_param) => K8TopicSpec::new(
Some(computed_param.partitions),
Some(computed_param.replication_factor),
Some(computed_param.ignore_rack_assignment),
None
),
TopicSpec::Assigned(assign_param) => K8TopicSpec::new(
None,
None,
None,
Some(replica_map_to_k8_partition(assign_param))
)
}
}
}
fn replica_map_to_k8_partition(partition_maps: PartitionMaps) -> Vec<K8Partition> {
let mut k8_partitions: Vec<K8Partition> = vec![];
for partition in partition_maps.maps() {
k8_partitions.push(K8Partition::new(partition.id, partition.replicas.clone()));
}
k8_partitions
}
impl From<(PartitionCount,ReplicationFactor,IgnoreRackAssignment)> for TopicSpec {
fn from(spec: (PartitionCount,ReplicationFactor,IgnoreRackAssignment)) -> Self {
let (count,factor, rack) = spec;
Self::new_computed(count,factor,Some(rack))
}
}
impl From<(PartitionCount,ReplicationFactor)> for TopicSpec {
fn from(spec: (PartitionCount,ReplicationFactor)) -> Self {
let (count,factor) = spec;
Self::new_computed(count,factor,Some(false))
}
}
#[derive(Decode, Encode, Default, Debug, Clone, PartialEq)]
pub struct PartitionMap {
pub id: PartitionId,
pub replicas: Vec<SpuId>,
}
#[cfg(test)]
pub mod test {
use super::*;
use std::io::Cursor;
#[test]
fn test_is_computed_topic() {
let p1: PartitionMaps = vec![(1, vec![0]), (2, vec![2])].into();
let t1 = TopicSpec::new_assigned(p1);
assert_eq!(t1.is_computed(), false);
let t2 = TopicSpec::new_computed(0, 0, None);
assert_eq!(t2.is_computed(), true);
}
#[test]
fn test_valid_computed_replica_params() {
let t1_result = TopicSpec::valid_partition(&-1);
assert!(t1_result.is_err());
assert_eq!(
format!("{}", t1_result.unwrap_err()),
"partition is mandatory for computed topics"
);
let t2_result = TopicSpec::valid_partition(&0);
assert!(t2_result.is_err());
assert_eq!(
format!("{}", t2_result.unwrap_err()),
"partition must be greater than 0"
);
let t3_result = TopicSpec::valid_partition(&1);
assert!(t3_result.is_ok());
let t4_result = TopicSpec::valid_replication_factor(&-1);
assert!(t4_result.is_err());
assert_eq!(
format!("{}", t4_result.unwrap_err()),
"replication factor is mandatory for computed topics"
);
let t5_result = TopicSpec::valid_replication_factor(&0);
assert!(t5_result.is_err());
assert_eq!(
format!("{}", t5_result.unwrap_err()),
"replication factor must be greater than 0"
);
let t6_result = TopicSpec::valid_replication_factor(&1);
assert!(t6_result.is_ok());
}
#[test]
fn test_replica_map_ids() {
let p1: PartitionMaps = vec![(1, vec![0]), (2, vec![2])].into();
let p1_result = p1.valid_partition_map();
assert!(p1_result.is_err());
assert_eq!(
format!("{}", p1_result.unwrap_err()),
"assigned partitions must start with id 0"
);
let p2: PartitionMaps = vec![(0, vec![0]), (2, vec![2])].into();
let p2_result = p2.valid_partition_map();
assert!(p2_result.is_err());
assert_eq!(
format!("{}", p2_result.unwrap_err()),
"assigned partition ids must be in sequence and without gaps"
);
let p3: PartitionMaps = vec![(0, vec![0]), (2, vec![2]), (1, vec![1])].into();
let p3_result = p3.valid_partition_map();
assert!(p3_result.is_err());
assert_eq!(
format!("{}", p3_result.unwrap_err()),
"assigned partition ids must be in sequence and without gaps"
);
let p4: PartitionMaps = vec![(0, vec![0]), (1, vec![1]), (1, vec![1])].into();
let p4_result = p4.valid_partition_map();
assert!(p4_result.is_err());
assert_eq!(
format!("{}", p4_result.unwrap_err()),
"assigned partition ids must be in sequence and without gaps"
);
let p5: PartitionMaps = vec![(0, vec![1]), (1, vec![1]), (2, vec![2])].into();
let p5_result = p5.valid_partition_map();
assert!(p5_result.is_ok());
}
#[test]
fn test_replica_map_spu_ids() {
let p1: PartitionMaps = vec![(0, vec![]), (1, vec![1])].into();
let p1_result = p1.valid_partition_map();
assert!(p1_result.is_err());
assert_eq!(
format!("{}", p1_result.unwrap_err()),
"assigned replicas must have at least one spu id"
);
let p2: PartitionMaps = vec![(0, vec![1, 2]), (1, vec![1])].into();
let p2_result = p2.valid_partition_map();
assert!(p2_result.is_err());
assert_eq!(
format!("{}", p2_result.unwrap_err()),
"all assigned replicas must have the same number of spu ids: 2"
);
let p3: PartitionMaps = vec![(0, vec![1, 2]), (1, vec![1, 1])].into();
let p3_result = p3.valid_partition_map();
assert!(p3_result.is_err());
assert_eq!(
format!("{}", p3_result.unwrap_err()),
"duplicate spu ids found in assigned partition with id: 1"
);
let p4: PartitionMaps = vec![(0, vec![3, 1, 2, 3])].into();
let p4_result = p4.valid_partition_map();
assert!(p4_result.is_err());
assert_eq!(
format!("{}", p4_result.unwrap_err()),
"duplicate spu ids found in assigned partition with id: 0"
);
let p5: PartitionMaps = vec![(0, vec![1, 2]), (1, vec![1, -2])].into();
let p5_result = p5.valid_partition_map();
assert!(p5_result.is_err());
assert_eq!(
format!("{}", p5_result.unwrap_err()),
"invalid spu id: -2 in assigned partition with id: 1"
);
}
#[test]
fn test_unique_spus_in_partition_map() {
let p1: PartitionMaps = vec![
(0, vec![0, 1, 3]),
(1, vec![0, 2, 3]),
(2, vec![1, 3, 4]),
].into();
let p1_result = p1.unique_spus_in_partition_map();
let expected_p1_result: Vec<i32> = vec![0, 1, 3, 2, 4];
assert_eq!(p1_result, expected_p1_result);
}
#[test]
fn test_encode_decode_assigned_topic_spec() {
let partition_map: PartitionMaps = vec![PartitionMap {
id: 0,
replicas: vec![5001, 5002],
}].into();
let topic_spec = TopicSpec::Assigned(partition_map.clone());
let mut dest = vec![];
let result = topic_spec.encode(&mut dest,0);
assert!(result.is_ok());
let expected_dest = [
0x00,
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x02,
0x00, 0x00, 0x13, 0x89,
0x00, 0x00, 0x13, 0x8a,
];
assert_eq!(dest, expected_dest);
let mut topic_spec_decoded = TopicSpec::default();
let result = topic_spec_decoded.decode(&mut Cursor::new(&expected_dest),0);
assert!(result.is_ok());
match topic_spec_decoded {
TopicSpec::Assigned(partition_map) => {
assert_eq!(
partition_map,
vec![PartitionMap {
id: 0,
replicas: vec![5001, 5002],
}].into()
);
}
_ => assert!(
false,
"expect assigned topic spec, found {:?}",
topic_spec_decoded
),
}
}
#[test]
fn test_encode_decode_computed_topic_spec() {
let topic_spec = TopicSpec::Computed((2, 3, true).into());
let mut dest = vec![];
let result = topic_spec.encode(&mut dest,0);
assert!(result.is_ok());
let expected_dest = [
0x01,
0x00, 0x00, 0x00, 0x02,
0x00, 0x00, 0x00, 0x03,
0x01,
];
assert_eq!(dest, expected_dest);
let mut topic_spec_decoded = TopicSpec::default();
let result = topic_spec_decoded.decode(&mut Cursor::new(&expected_dest),0);
assert!(result.is_ok());
match topic_spec_decoded {
TopicSpec::Computed(param) => {
assert_eq!(param.partitions, 2);
assert_eq!(param.replication_factor, 3);
assert_eq!(param.ignore_rack_assignment, true);
}
_ => assert!(
false,
"expect computed topic spec, found {:?}",
topic_spec_decoded
),
}
}
#[test]
fn test_partition_map_str() {
let p1: PartitionMaps = vec![
(0, vec![0, 1, 3]),
(1, vec![0, 2, 3]),
(2, vec![1, 3, 4]),
].into();
let spec = TopicSpec::new_assigned(p1);
assert_eq!(
spec.partition_map_str(),
Some("0:[0, 1, 3], 1:[0, 2, 3], 2:[1, 3, 4]".to_string())
);
let p2 = PartitionMaps::default();
let spec2 = TopicSpec::new_assigned(p2);
assert_eq!(spec2.partition_map_str(), Some("".to_string()));
}
}