1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use crate::{client::Client, proto, Result};
use tracing::debug;

/// Request to obtain the current topology of the cluster the gateway is part of.
#[derive(Debug)]
pub struct TopologyBuilder(Client);

impl TopologyBuilder {
    /// Create a new topology request builder.
    pub fn new(client: Client) -> Self {
        TopologyBuilder(client)
    }

    /// Send a topology request to the configured gateway.
    #[tracing::instrument(skip(self), name = "topology")]
    pub async fn send(mut self) -> Result<TopologyResponse> {
        let req = proto::TopologyRequest {};
        debug!(?req, "sending request");

        let res = self
            .0
            .gateway_client
            .topology(tonic::Request::new(req))
            .await?;
        Ok(TopologyResponse(res.into_inner()))
    }
}

/// The current topology of the cluster
#[derive(Debug)]
pub struct TopologyResponse(proto::TopologyResponse);

impl TopologyResponse {
    /// List of brokers part of this cluster
    pub fn brokers(&self) -> Vec<BrokerInfo> {
        self.0
            .brokers
            .iter()
            .map(|proto| BrokerInfo(proto.clone()))
            .collect()
    }

    /// How many nodes are in the cluster.
    pub fn cluster_size(&self) -> u32 {
        self.0.cluster_size as u32
    }

    /// How many partitions are spread across the cluster.
    pub fn partitions_count(&self) -> u32 {
        self.0.partitions_count as u32
    }

    /// Configured replication factor for this cluster.
    pub fn replication_factor(&self) -> u32 {
        self.0.replication_factor as u32
    }

    /// gateway version
    pub fn gateway_version(&self) -> &str {
        &self.0.gateway_version
    }
}

/// Zeebe broker info
#[derive(Debug)]
pub struct BrokerInfo(proto::BrokerInfo);

impl BrokerInfo {
    /// Unique (within a cluster) node ID for the broker.
    pub fn node_id(&self) -> i32 {
        self.0.node_id
    }

    /// Hostname of the broker.
    pub fn host(&self) -> &str {
        &self.0.host
    }

    /// Port for the broker.
    pub fn port(&self) -> u32 {
        self.0.port as u32
    }

    /// List of partitions managed or replicated on this broker.
    pub fn partitions(&self) -> Vec<Partition> {
        self.0
            .partitions
            .iter()
            .map(|proto| Partition(proto.clone()))
            .collect()
    }

    /// Broker version.
    pub fn version(&self) -> &str {
        &self.0.version
    }
}

/// Zeebe partition.
#[derive(Debug)]
pub struct Partition(proto::Partition);

impl Partition {
    /// the unique ID of this partition
    pub fn partition_id(&self) -> i32 {
        self.0.partition_id
    }

    /// The role of the broker for this partition.
    pub fn role(&self) -> i32 {
        self.0.role
    }

    /// The health of this partition
    pub fn health(&self) -> i32 {
        self.0.health
    }
}