Skip to main content

rivven_protocol/
metadata.rs

1//! Cluster and topic metadata types
2
3use serde::{Deserialize, Serialize};
4
5/// Broker/node information for metadata discovery
6#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
7pub struct BrokerInfo {
8    /// Node ID
9    pub node_id: String,
10    /// Host for client connections
11    pub host: String,
12    /// Port for client connections
13    pub port: u16,
14    /// Optional rack ID for rack-aware placement
15    pub rack: Option<String>,
16}
17
18impl BrokerInfo {
19    /// Create a new broker info
20    pub fn new(node_id: impl Into<String>, host: impl Into<String>, port: u16) -> Self {
21        Self {
22            node_id: node_id.into(),
23            host: host.into(),
24            port,
25            rack: None,
26        }
27    }
28
29    /// Set rack ID
30    pub fn with_rack(mut self, rack: impl Into<String>) -> Self {
31        self.rack = Some(rack.into());
32        self
33    }
34
35    /// Get the address string (host:port)
36    pub fn address(&self) -> String {
37        format!("{}:{}", self.host, self.port)
38    }
39}
40
41/// Topic metadata for cluster discovery
42#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
43pub struct TopicMetadata {
44    /// Topic name
45    pub name: String,
46    /// Is the topic internal (e.g., __consumer_offsets, _schemas)
47    pub is_internal: bool,
48    /// Partition metadata
49    pub partitions: Vec<PartitionMetadata>,
50}
51
52impl TopicMetadata {
53    /// Create a new topic metadata
54    pub fn new(name: impl Into<String>, partitions: Vec<PartitionMetadata>) -> Self {
55        let name = name.into();
56        let is_internal = name.starts_with('_');
57        Self {
58            name,
59            is_internal,
60            partitions,
61        }
62    }
63
64    /// Get partition count
65    pub fn partition_count(&self) -> usize {
66        self.partitions.len()
67    }
68
69    /// Check if topic has any offline partitions
70    pub fn has_offline_partitions(&self) -> bool {
71        self.partitions.iter().any(|p| p.offline)
72    }
73
74    /// Get the leader node ID for a partition
75    pub fn partition_leader(&self, partition: u32) -> Option<&str> {
76        self.partitions
77            .iter()
78            .find(|p| p.partition == partition)
79            .and_then(|p| p.leader.as_deref())
80    }
81}
82
83/// Partition metadata for cluster discovery
84#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
85pub struct PartitionMetadata {
86    /// Partition ID
87    pub partition: u32,
88    /// Leader node ID (None if no leader)
89    pub leader: Option<String>,
90    /// Replica node IDs
91    pub replicas: Vec<String>,
92    /// ISR (in-sync replica) node IDs
93    pub isr: Vec<String>,
94    /// Is offline (no leader available)
95    pub offline: bool,
96}
97
98impl PartitionMetadata {
99    /// Create a new partition metadata
100    pub fn new(partition: u32) -> Self {
101        Self {
102            partition,
103            leader: None,
104            replicas: Vec::new(),
105            isr: Vec::new(),
106            offline: true,
107        }
108    }
109
110    /// Set the leader
111    pub fn with_leader(mut self, leader: impl Into<String>) -> Self {
112        self.leader = Some(leader.into());
113        self.offline = false;
114        self
115    }
116
117    /// Add replicas
118    pub fn with_replicas(mut self, replicas: Vec<String>) -> Self {
119        self.replicas = replicas;
120        self
121    }
122
123    /// Add ISR
124    pub fn with_isr(mut self, isr: Vec<String>) -> Self {
125        self.isr = isr;
126        self
127    }
128
129    /// Check if partition is under-replicated
130    pub fn is_under_replicated(&self) -> bool {
131        self.isr.len() < self.replicas.len()
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138
139    #[test]
140    fn test_broker_info() {
141        let broker = BrokerInfo::new("node1", "localhost", 9092).with_rack("rack1");
142
143        assert_eq!(broker.node_id, "node1");
144        assert_eq!(broker.address(), "localhost:9092");
145        assert_eq!(broker.rack, Some("rack1".to_string()));
146    }
147
148    #[test]
149    fn test_topic_metadata_internal() {
150        let topic = TopicMetadata::new("_schemas", vec![]);
151        assert!(topic.is_internal);
152
153        let topic = TopicMetadata::new("events", vec![]);
154        assert!(!topic.is_internal);
155    }
156
157    #[test]
158    fn test_partition_metadata() {
159        let partition = PartitionMetadata::new(0)
160            .with_leader("node1")
161            .with_replicas(vec!["node1".to_string(), "node2".to_string()])
162            .with_isr(vec!["node1".to_string()]);
163
164        assert!(!partition.offline);
165        assert!(partition.is_under_replicated());
166        assert_eq!(partition.leader, Some("node1".to_string()));
167    }
168}