1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
//!
//! # Fluvio -- Topic/Partition Parameters
//!
//! Intermediate structure to collect metadata information
//!
use kf_protocol::api::Offset;

/// Fetch Logs parameters
#[derive(Debug)]
pub struct FetchLogsParam {
    pub topic: String,
    pub max_bytes: i32,

    pub partitions: Vec<PartitionParam>,
}

/// Topic/Partition parameters
#[derive(Debug, Clone, PartialEq)]
pub struct TopicPartitionParam {
    pub topic_name: String,

    pub leaders: Vec<LeaderParam>,
}

/// Replica Leader parameters
#[derive(Debug, Clone, PartialEq)]
pub struct LeaderParam {
    pub leader_id: i32,
    pub server_addr: String,

    pub partitions: Vec<PartitionParam>,
}

/// Partition parameters
#[derive(Debug, Clone, PartialEq, Default)]
pub struct PartitionParam {
    pub partition_idx: i32,
    pub offset: Offset,
    pub epoch: i32,
}

pub use topic_partition::*;


mod topic_partition {

    use serde::Deserialize;
    use std::io::Error as IoError;
    use std::io::ErrorKind;
    use std::fs::read_to_string;
    use std::path::Path;

    use kf_protocol::message::topic::CreatableReplicaAssignment;
    use sc_api::topic::FlvTopicPartitionMap;


    #[derive(Debug)]
    pub enum ReplicaConfig {
        // replica assignment
        Assigned(Partitions),

        // partitions, replication, ignore_rack_assignment
        Computed(i32, i16, bool),
    }



    #[derive(Debug, Deserialize, PartialEq)]
    #[serde(rename_all = "camelCase")]
    pub struct Partitions {
        partitions: Vec<Partition>,
    }

    #[derive(Debug, Deserialize, PartialEq)]
    pub struct Partition {
        id: i32,
        replicas: Vec<i32>,
    }



    impl Partitions {
        /// Read and decode the json file into Replica Assignment map
        pub fn file_decode<T: AsRef<Path>>(path: T) -> Result<Self, IoError> {
            let file_str: String = read_to_string(path)?;
            serde_json::from_str(&file_str)
                .map_err(|err| IoError::new(ErrorKind::InvalidData, format!("{}", err)))
        }

        // Encode Replica Assignment map into Kafka Create Replica Assignment
        pub fn kf_encode(&self) -> Vec<CreatableReplicaAssignment> {
            let mut assignments: Vec<CreatableReplicaAssignment> = vec![];
            for partition in &self.partitions {
                assignments.push(CreatableReplicaAssignment {
                    partition_index: partition.id,
                    broker_ids: partition.replicas.clone(),
                })
            }
            assignments
        }

        // Encode Replica Assignment map into Fluvio format
        pub fn sc_encode(&self) -> Vec<FlvTopicPartitionMap> {
            let mut partition_map: Vec<FlvTopicPartitionMap> = vec![];
            for partition in &self.partitions {
                partition_map.push(FlvTopicPartitionMap {
                    id: partition.id,
                    replicas: partition.replicas.clone(),
                })
            }
            partition_map
        }
    }


}