rivven_protocol/
metadata.rs1use serde::{Deserialize, Serialize};
4
5#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
7pub struct BrokerInfo {
8 pub node_id: String,
10 pub host: String,
12 pub port: u16,
14 pub rack: Option<String>,
16}
17
18impl BrokerInfo {
19 pub fn new(node_id: impl Into<String>, host: impl Into<String>, port: u16) -> Self {
21 Self {
22 node_id: node_id.into(),
23 host: host.into(),
24 port,
25 rack: None,
26 }
27 }
28
29 pub fn with_rack(mut self, rack: impl Into<String>) -> Self {
31 self.rack = Some(rack.into());
32 self
33 }
34
35 pub fn address(&self) -> String {
37 format!("{}:{}", self.host, self.port)
38 }
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
43pub struct TopicMetadata {
44 pub name: String,
46 pub is_internal: bool,
48 pub partitions: Vec<PartitionMetadata>,
50}
51
52impl TopicMetadata {
53 pub fn new(name: impl Into<String>, partitions: Vec<PartitionMetadata>) -> Self {
55 let name = name.into();
56 let is_internal = name.starts_with('_');
57 Self {
58 name,
59 is_internal,
60 partitions,
61 }
62 }
63
64 pub fn partition_count(&self) -> usize {
66 self.partitions.len()
67 }
68
69 pub fn has_offline_partitions(&self) -> bool {
71 self.partitions.iter().any(|p| p.offline)
72 }
73
74 pub fn partition_leader(&self, partition: u32) -> Option<&str> {
76 self.partitions
77 .iter()
78 .find(|p| p.partition == partition)
79 .and_then(|p| p.leader.as_deref())
80 }
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
85pub struct PartitionMetadata {
86 pub partition: u32,
88 pub leader: Option<String>,
90 pub replicas: Vec<String>,
92 pub isr: Vec<String>,
94 pub offline: bool,
96}
97
98impl PartitionMetadata {
99 pub fn new(partition: u32) -> Self {
101 Self {
102 partition,
103 leader: None,
104 replicas: Vec::new(),
105 isr: Vec::new(),
106 offline: true,
107 }
108 }
109
110 pub fn with_leader(mut self, leader: impl Into<String>) -> Self {
112 self.leader = Some(leader.into());
113 self.offline = false;
114 self
115 }
116
117 pub fn with_replicas(mut self, replicas: Vec<String>) -> Self {
119 self.replicas = replicas;
120 self
121 }
122
123 pub fn with_isr(mut self, isr: Vec<String>) -> Self {
125 self.isr = isr;
126 self
127 }
128
129 pub fn is_under_replicated(&self) -> bool {
131 self.isr.len() < self.replicas.len()
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138
139 #[test]
140 fn test_broker_info() {
141 let broker = BrokerInfo::new("node1", "localhost", 9092).with_rack("rack1");
142
143 assert_eq!(broker.node_id, "node1");
144 assert_eq!(broker.address(), "localhost:9092");
145 assert_eq!(broker.rack, Some("rack1".to_string()));
146 }
147
148 #[test]
149 fn test_topic_metadata_internal() {
150 let topic = TopicMetadata::new("_schemas", vec![]);
151 assert!(topic.is_internal);
152
153 let topic = TopicMetadata::new("events", vec![]);
154 assert!(!topic.is_internal);
155 }
156
157 #[test]
158 fn test_partition_metadata() {
159 let partition = PartitionMetadata::new(0)
160 .with_leader("node1")
161 .with_replicas(vec!["node1".to_string(), "node2".to_string()])
162 .with_isr(vec!["node1".to_string()]);
163
164 assert!(!partition.offline);
165 assert!(partition.is_under_replicated());
166 assert_eq!(partition.leader, Some("node1".to_string()));
167 }
168}