1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
//!
//! # Fetch Topics
//!
//! Public API to retrieve Topics from the SC.
//!
use kf_protocol::api::Request;
use kf_protocol::api::FlvErrorCode;
use kf_protocol::derive::Decode;
use kf_protocol::derive::Encode;

use flv_metadata::topic::{TopicSpec, TopicStatus};

use crate::ScApiKey;

// -----------------------------------
// FlvFetchTopicsRequest
// -----------------------------------

#[derive(Decode, Encode, Default, Debug)]
pub struct FlvFetchTopicsRequest {
    /// A list of one or more topics to be retireved.
    /// None retrieves all topics.
    pub names: Option<Vec<String>>,
}

// -----------------------------------
// FlvFetchTopicsResponse
// -----------------------------------

#[derive(Encode, Decode, Default, Debug)]
pub struct FlvFetchTopicsResponse {
    /// The list of topics that have been retrieved.
    pub topics: Vec<FlvFetchTopicResponse>,
}

#[derive(Encode, Decode, Default, Debug)]
pub struct FlvFetchTopicResponse {
    /// The error code, None for no errors
    pub error_code: FlvErrorCode,

    /// The name of the topic.
    pub name: String,

    /// Topic parameters, None if error
    pub topic: Option<FlvFetchTopic>,
}

#[derive(Encode, Decode, Default, Debug)]
pub struct FlvFetchTopic {
    /// Topic spec
    pub spec: TopicSpec,

    /// Topic status
    pub status: TopicStatus,

    /// Replica assignment for each partition
    pub partition_replicas: Option<Vec<FlvPartitionReplica>>,
}

#[derive(Encode, Decode, Default, Debug)]
pub struct FlvPartitionReplica {
    /// Partition id
    pub id: i32,

    /// Replica leader
    pub leader: i32,

    /// Replica assignment
    pub replicas: Vec<i32>,

    /// Only live replicas in replica assignment
    pub live_replicas: Vec<i32>,
}

// -----------------------------------
// Implementation - FlvFetchTopicsRequest
// -----------------------------------

impl Request for FlvFetchTopicsRequest {
    const API_KEY: u16 = ScApiKey::FlvFetchTopics as u16;
    type Response = FlvFetchTopicsResponse;
}

// -----------------------------------
// Implementation - FlvFetchTopicResponse
// -----------------------------------
impl FlvFetchTopicResponse {
    /// Constructor for topics found
    pub fn new(
        name: String,
        spec: TopicSpec,
        status: TopicStatus,
        partition_replicas: Option<Vec<FlvPartitionReplica>>,
    ) -> Self {
        FlvFetchTopicResponse {
            name: name,
            error_code: FlvErrorCode::None,
            topic: Some(FlvFetchTopic {
                spec,
                status,
                partition_replicas,
            }),
        }
    }

    /// Constructor for topics that are not found
    pub fn new_not_found(name: String) -> Self {
        FlvFetchTopicResponse {
            name: name,
            error_code: FlvErrorCode::TopicNotFound,
            topic: None,
        }
    }

    /// Update topic partitions.
    /// Requirements:
    ///  * Must be called with valid topic, otherwise, update will fail silently
    pub fn update_partitions(&mut self, partition_replicas: Option<Vec<FlvPartitionReplica>>) {
        if self.topic.is_some() {
            self.topic.as_mut().unwrap().partition_replicas = partition_replicas;
        }
    }
}