rivven_protocol/
metadata.rs1use serde::{Deserialize, Serialize};
4
5const INTERNAL_TOPICS: &[&str] = &["__consumer_offsets", "__transaction_state", "_schemas"];
7
8pub fn is_internal_topic(name: &str) -> bool {
14 INTERNAL_TOPICS.contains(&name) || name.starts_with("__")
15}
16
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
19pub struct BrokerInfo {
20 pub node_id: String,
22 pub host: String,
24 pub port: u16,
26 pub rack: Option<String>,
28}
29
30impl BrokerInfo {
31 pub fn new(node_id: impl Into<String>, host: impl Into<String>, port: u16) -> Self {
33 Self {
34 node_id: node_id.into(),
35 host: host.into(),
36 port,
37 rack: None,
38 }
39 }
40
41 pub fn with_rack(mut self, rack: impl Into<String>) -> Self {
43 self.rack = Some(rack.into());
44 self
45 }
46
47 pub fn address(&self) -> String {
49 format!("{}:{}", self.host, self.port)
50 }
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
55pub struct TopicMetadata {
56 pub name: String,
58 pub is_internal: bool,
60 pub partitions: Vec<PartitionMetadata>,
62}
63
64impl TopicMetadata {
65 pub fn new(name: impl Into<String>, partitions: Vec<PartitionMetadata>) -> Self {
67 let name = name.into();
68 let is_internal = is_internal_topic(&name);
69 Self {
70 name,
71 is_internal,
72 partitions,
73 }
74 }
75
76 pub fn partition_count(&self) -> usize {
78 self.partitions.len()
79 }
80
81 pub fn has_offline_partitions(&self) -> bool {
83 self.partitions.iter().any(|p| p.offline)
84 }
85
86 pub fn partition_leader(&self, partition: u32) -> Option<&str> {
88 self.partitions
89 .iter()
90 .find(|p| p.partition == partition)
91 .and_then(|p| p.leader.as_deref())
92 }
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
97pub struct PartitionMetadata {
98 pub partition: u32,
100 pub leader: Option<String>,
102 pub replicas: Vec<String>,
104 pub isr: Vec<String>,
106 pub offline: bool,
108}
109
110impl PartitionMetadata {
111 pub fn new(partition: u32) -> Self {
113 Self {
114 partition,
115 leader: None,
116 replicas: Vec::new(),
117 isr: Vec::new(),
118 offline: true,
119 }
120 }
121
122 pub fn with_leader(mut self, leader: impl Into<String>) -> Self {
124 self.leader = Some(leader.into());
125 self.offline = false;
126 self
127 }
128
129 pub fn with_replicas(mut self, replicas: Vec<String>) -> Self {
131 self.replicas = replicas;
132 self
133 }
134
135 pub fn with_isr(mut self, isr: Vec<String>) -> Self {
137 self.isr = isr;
138 self
139 }
140
141 pub fn is_under_replicated(&self) -> bool {
143 self.isr.len() < self.replicas.len()
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150
151 #[test]
152 fn test_broker_info() {
153 let broker = BrokerInfo::new("node1", "localhost", 9092).with_rack("rack1");
154
155 assert_eq!(broker.node_id, "node1");
156 assert_eq!(broker.address(), "localhost:9092");
157 assert_eq!(broker.rack, Some("rack1".to_string()));
158 }
159
160 #[test]
161 fn test_topic_metadata_internal() {
162 let topic = TopicMetadata::new("_schemas", vec![]);
163 assert!(topic.is_internal);
164
165 let topic = TopicMetadata::new("events", vec![]);
166 assert!(!topic.is_internal);
167 }
168
169 #[test]
170 fn test_partition_metadata() {
171 let partition = PartitionMetadata::new(0)
172 .with_leader("node1")
173 .with_replicas(vec!["node1".to_string(), "node2".to_string()])
174 .with_isr(vec!["node1".to_string()]);
175
176 assert!(!partition.offline);
177 assert!(partition.is_under_replicated());
178 assert_eq!(partition.leader, Some("node1".to_string()));
179 }
180}