Skip to main content

rivven_protocol/
metadata.rs

1//! Cluster and topic metadata types
2
3use serde::{Deserialize, Serialize};
4
5/// Known internal topic names (Kafka-style explicit list).
6const INTERNAL_TOPICS: &[&str] = &["__consumer_offsets", "__transaction_state", "_schemas"];
7
8/// Check whether a topic name designates an internal system topic.
9///
10/// Uses an explicit allowlist plus the `__` prefix convention (double
11/// underscore) — matching Kafka's behaviour.  Single-underscore topics
12/// like `_audit_log` are treated as user topics.
13pub fn is_internal_topic(name: &str) -> bool {
14    INTERNAL_TOPICS.contains(&name) || name.starts_with("__")
15}
16
17/// Broker/node information for metadata discovery
18#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
19pub struct BrokerInfo {
20    /// Node ID
21    pub node_id: String,
22    /// Host for client connections
23    pub host: String,
24    /// Port for client connections
25    pub port: u16,
26    /// Optional rack ID for rack-aware placement
27    pub rack: Option<String>,
28}
29
30impl BrokerInfo {
31    /// Create a new broker info
32    pub fn new(node_id: impl Into<String>, host: impl Into<String>, port: u16) -> Self {
33        Self {
34            node_id: node_id.into(),
35            host: host.into(),
36            port,
37            rack: None,
38        }
39    }
40
41    /// Set rack ID
42    pub fn with_rack(mut self, rack: impl Into<String>) -> Self {
43        self.rack = Some(rack.into());
44        self
45    }
46
47    /// Get the address string (host:port)
48    pub fn address(&self) -> String {
49        format!("{}:{}", self.host, self.port)
50    }
51}
52
53/// Topic metadata for cluster discovery
54#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
55pub struct TopicMetadata {
56    /// Topic name
57    pub name: String,
58    /// Is the topic internal (e.g., __consumer_offsets, _schemas)
59    pub is_internal: bool,
60    /// Partition metadata
61    pub partitions: Vec<PartitionMetadata>,
62}
63
64impl TopicMetadata {
65    /// Create a new topic metadata
66    pub fn new(name: impl Into<String>, partitions: Vec<PartitionMetadata>) -> Self {
67        let name = name.into();
68        let is_internal = is_internal_topic(&name);
69        Self {
70            name,
71            is_internal,
72            partitions,
73        }
74    }
75
76    /// Get partition count
77    pub fn partition_count(&self) -> usize {
78        self.partitions.len()
79    }
80
81    /// Check if topic has any offline partitions
82    pub fn has_offline_partitions(&self) -> bool {
83        self.partitions.iter().any(|p| p.offline)
84    }
85
86    /// Get the leader node ID for a partition
87    pub fn partition_leader(&self, partition: u32) -> Option<&str> {
88        self.partitions
89            .iter()
90            .find(|p| p.partition == partition)
91            .and_then(|p| p.leader.as_deref())
92    }
93}
94
95/// Partition metadata for cluster discovery
96#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
97pub struct PartitionMetadata {
98    /// Partition ID
99    pub partition: u32,
100    /// Leader node ID (None if no leader)
101    pub leader: Option<String>,
102    /// Replica node IDs
103    pub replicas: Vec<String>,
104    /// ISR (in-sync replica) node IDs
105    pub isr: Vec<String>,
106    /// Is offline (no leader available)
107    pub offline: bool,
108}
109
110impl PartitionMetadata {
111    /// Create a new partition metadata
112    pub fn new(partition: u32) -> Self {
113        Self {
114            partition,
115            leader: None,
116            replicas: Vec::new(),
117            isr: Vec::new(),
118            offline: true,
119        }
120    }
121
122    /// Set the leader
123    pub fn with_leader(mut self, leader: impl Into<String>) -> Self {
124        self.leader = Some(leader.into());
125        self.offline = false;
126        self
127    }
128
129    /// Add replicas
130    pub fn with_replicas(mut self, replicas: Vec<String>) -> Self {
131        self.replicas = replicas;
132        self
133    }
134
135    /// Add ISR
136    pub fn with_isr(mut self, isr: Vec<String>) -> Self {
137        self.isr = isr;
138        self
139    }
140
141    /// Check if partition is under-replicated
142    pub fn is_under_replicated(&self) -> bool {
143        self.isr.len() < self.replicas.len()
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150
151    #[test]
152    fn test_broker_info() {
153        let broker = BrokerInfo::new("node1", "localhost", 9092).with_rack("rack1");
154
155        assert_eq!(broker.node_id, "node1");
156        assert_eq!(broker.address(), "localhost:9092");
157        assert_eq!(broker.rack, Some("rack1".to_string()));
158    }
159
160    #[test]
161    fn test_topic_metadata_internal() {
162        let topic = TopicMetadata::new("_schemas", vec![]);
163        assert!(topic.is_internal);
164
165        let topic = TopicMetadata::new("events", vec![]);
166        assert!(!topic.is_internal);
167    }
168
169    #[test]
170    fn test_partition_metadata() {
171        let partition = PartitionMetadata::new(0)
172            .with_leader("node1")
173            .with_replicas(vec!["node1".to_string(), "node2".to_string()])
174            .with_isr(vec!["node1".to_string()]);
175
176        assert!(!partition.offline);
177        assert!(partition.is_under_replicated());
178        assert_eq!(partition.leader, Some("node1".to_string()));
179    }
180}