Skip to main content

rivven_cluster/
partition.rs

1//! Partition types and state management
2
3use crate::node::NodeId;
4use serde::{Deserialize, Serialize};
5use std::collections::HashSet;
6
7/// Unique partition identifier
8#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
9pub struct PartitionId {
10    pub topic: String,
11    pub partition: u32,
12}
13
14impl PartitionId {
15    pub fn new(topic: impl Into<String>, partition: u32) -> Self {
16        Self {
17            topic: topic.into(),
18            partition,
19        }
20    }
21}
22
23impl std::fmt::Display for PartitionId {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        write!(f, "{}/{}", self.topic, self.partition)
26    }
27}
28
29/// Partition replica state
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
31#[serde(rename_all = "lowercase")]
32pub enum ReplicaState {
33    /// Replica is in sync with leader
34    InSync,
35    /// Replica is catching up
36    CatchingUp,
37    /// Replica is offline
38    Offline,
39    /// Replica is being added
40    Adding,
41    /// Replica is being removed
42    Removing,
43}
44
45/// Information about a partition replica
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct ReplicaInfo {
48    /// Node hosting this replica
49    pub node_id: NodeId,
50    /// Replica state
51    pub state: ReplicaState,
52    /// Log end offset (latest message)
53    pub log_end_offset: u64,
54    /// High watermark (committed)
55    pub high_watermark: u64,
56    /// Lag behind leader
57    pub lag: u64,
58}
59
60impl ReplicaInfo {
61    pub fn new(node_id: NodeId) -> Self {
62        Self {
63            node_id,
64            state: ReplicaState::Adding,
65            log_end_offset: 0,
66            high_watermark: 0,
67            lag: 0,
68        }
69    }
70
71    /// Check if replica is in sync
72    pub fn is_in_sync(&self) -> bool {
73        matches!(self.state, ReplicaState::InSync)
74    }
75}
76
77/// Partition state and metadata
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct PartitionState {
80    /// Partition identifier
81    pub id: PartitionId,
82
83    /// Current leader node
84    pub leader: Option<NodeId>,
85
86    /// Preferred leader (for rebalancing)
87    pub preferred_leader: NodeId,
88
89    /// All replicas (ordered: leader first, then followers)
90    pub replicas: Vec<ReplicaInfo>,
91
92    /// In-sync replica set (ISR)
93    pub isr: HashSet<NodeId>,
94
95    /// Epoch for leader election (increments on leader change)
96    pub leader_epoch: u64,
97
98    /// High watermark (committed offset)
99    pub high_watermark: u64,
100
101    /// Log start offset (after truncation)
102    pub log_start_offset: u64,
103
104    /// Is the partition online?
105    pub online: bool,
106
107    /// Is the partition under-replicated?
108    pub under_replicated: bool,
109}
110
111impl PartitionState {
112    /// Create new partition state
113    pub fn new(id: PartitionId, replicas: Vec<NodeId>) -> Self {
114        let preferred_leader = replicas.first().cloned().unwrap_or_default();
115        let replica_infos: Vec<_> = replicas
116            .iter()
117            .map(|n| ReplicaInfo::new(n.clone()))
118            .collect();
119        let isr: HashSet<_> = replicas.into_iter().collect();
120
121        // Partition is under-replicated if ISR size < replica count
122        let under_replicated = isr.len() < replica_infos.len();
123
124        Self {
125            id,
126            leader: None,
127            preferred_leader,
128            replicas: replica_infos,
129            isr,
130            leader_epoch: 0,
131            high_watermark: 0,
132            log_start_offset: 0,
133            online: false,
134            under_replicated,
135        }
136    }
137
138    /// Elect a new leader from ISR
139    pub fn elect_leader(&mut self) -> Option<&NodeId> {
140        // Prefer the preferred leader if in ISR
141        if self.isr.contains(&self.preferred_leader) {
142            self.leader = Some(self.preferred_leader.clone());
143        } else {
144            // Deterministic fallback: pick the lexicographically smallest ISR member
145            // so all nodes agree on the same leader for the same ISR state.
146            let mut sorted_isr: Vec<_> = self.isr.iter().collect();
147            sorted_isr.sort();
148            self.leader = sorted_isr.first().map(|n| (*n).clone());
149        }
150
151        if self.leader.is_some() {
152            self.leader_epoch += 1;
153            self.online = true;
154        }
155
156        self.leader.as_ref()
157    }
158
159    /// Add a node to ISR
160    pub fn add_to_isr(&mut self, node_id: &NodeId) {
161        self.isr.insert(node_id.clone());
162        self.update_under_replicated();
163
164        // Update replica state
165        if let Some(replica) = self.replicas.iter_mut().find(|r| &r.node_id == node_id) {
166            replica.state = ReplicaState::InSync;
167        }
168    }
169
170    /// Remove a node from ISR
171    pub fn remove_from_isr(&mut self, node_id: &NodeId) {
172        self.isr.remove(node_id);
173        self.update_under_replicated();
174
175        // Update replica state
176        if let Some(replica) = self.replicas.iter_mut().find(|r| &r.node_id == node_id) {
177            replica.state = ReplicaState::CatchingUp;
178        }
179
180        // If leader was removed, need new election
181        if self.leader.as_ref() == Some(node_id) {
182            self.leader = None;
183            self.online = false;
184        }
185    }
186
187    /// Update replica offset
188    pub fn update_replica_offset(&mut self, node_id: &NodeId, log_end_offset: u64) {
189        // First, get the leader's LEO if we need to calculate lag
190        let leader_leo = self.leader.as_ref().and_then(|leader| {
191            self.replicas
192                .iter()
193                .find(|r| &r.node_id == leader)
194                .map(|r| r.log_end_offset)
195        });
196
197        // Now update the replica
198        if let Some(replica) = self.replicas.iter_mut().find(|r| &r.node_id == node_id) {
199            replica.log_end_offset = log_end_offset;
200
201            // Calculate lag from leader
202            if let Some(leo) = leader_leo {
203                replica.lag = leo.saturating_sub(log_end_offset);
204            }
205        }
206    }
207
208    /// Advance high watermark
209    pub fn advance_high_watermark(&mut self) {
210        // HWM is the minimum LEO across all ISR members
211        let min_leo = self
212            .replicas
213            .iter()
214            .filter(|r| self.isr.contains(&r.node_id))
215            .map(|r| r.log_end_offset)
216            .min()
217            .unwrap_or(self.high_watermark);
218
219        if min_leo > self.high_watermark {
220            self.high_watermark = min_leo;
221
222            // Update all replica HWMs
223            for replica in &mut self.replicas {
224                replica.high_watermark = self.high_watermark;
225            }
226        }
227    }
228
229    /// Check if partition has enough replicas
230    fn update_under_replicated(&mut self) {
231        let expected = self.replicas.len();
232        let in_sync = self.isr.len();
233        self.under_replicated = in_sync < expected;
234    }
235
236    /// Get replica node IDs
237    pub fn replica_nodes(&self) -> Vec<&NodeId> {
238        self.replicas.iter().map(|r| &r.node_id).collect()
239    }
240
241    /// Get ISR nodes (deterministically sorted)
242    pub fn isr_nodes(&self) -> Vec<&NodeId> {
243        let mut nodes: Vec<_> = self.isr.iter().collect();
244        nodes.sort();
245        nodes
246    }
247
248    /// Check if a node is the leader
249    pub fn is_leader(&self, node_id: &NodeId) -> bool {
250        self.leader.as_ref() == Some(node_id)
251    }
252
253    /// Check if a node is a replica
254    pub fn is_replica(&self, node_id: &NodeId) -> bool {
255        self.replicas.iter().any(|r| &r.node_id == node_id)
256    }
257
258    /// Get replication factor
259    pub fn replication_factor(&self) -> usize {
260        self.replicas.len()
261    }
262}
263
264/// Topic configuration
265#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
266pub struct TopicConfig {
267    /// Topic name
268    pub name: String,
269
270    /// Number of partitions
271    pub partitions: u32,
272
273    /// Replication factor
274    pub replication_factor: u16,
275
276    /// Retention period in milliseconds
277    pub retention_ms: u64,
278
279    /// Segment size in bytes
280    pub segment_bytes: u64,
281
282    /// Minimum ISR required for writes
283    pub min_isr: u16,
284
285    /// Custom configuration
286    pub config: std::collections::HashMap<String, String>,
287}
288
289impl TopicConfig {
290    pub fn new(name: impl Into<String>, partitions: u32, replication_factor: u16) -> Self {
291        Self {
292            name: name.into(),
293            partitions,
294            replication_factor,
295            retention_ms: 7 * 24 * 60 * 60 * 1000, // 7 days
296            segment_bytes: 1024 * 1024 * 1024,     // 1 GB
297            min_isr: 1,
298            config: std::collections::HashMap::new(),
299        }
300    }
301
302    pub fn with_retention_ms(mut self, ms: u64) -> Self {
303        self.retention_ms = ms;
304        self
305    }
306
307    pub fn with_min_isr(mut self, min_isr: u16) -> Self {
308        self.min_isr = min_isr;
309        self
310    }
311}
312
313/// Topic state including all partitions
314#[derive(Debug, Clone, Serialize, Deserialize)]
315pub struct TopicState {
316    /// Topic configuration
317    pub config: TopicConfig,
318
319    /// Partition states
320    pub partitions: Vec<PartitionState>,
321}
322
323impl TopicState {
324    pub fn new(config: TopicConfig, partition_assignments: Vec<Vec<NodeId>>) -> Self {
325        let partitions = partition_assignments
326            .into_iter()
327            .enumerate()
328            .map(|(i, replicas)| {
329                let mut state =
330                    PartitionState::new(PartitionId::new(&config.name, i as u32), replicas);
331                // Automatically elect leader from ISR (first replica is preferred)
332                state.elect_leader();
333                state
334            })
335            .collect();
336
337        Self { config, partitions }
338    }
339
340    /// Get partition by index
341    pub fn partition(&self, idx: u32) -> Option<&PartitionState> {
342        self.partitions.get(idx as usize)
343    }
344
345    /// Get mutable partition by index
346    pub fn partition_mut(&mut self, idx: u32) -> Option<&mut PartitionState> {
347        self.partitions.get_mut(idx as usize)
348    }
349
350    /// Check if all partitions have a leader
351    pub fn is_fully_online(&self) -> bool {
352        self.partitions.iter().all(|p| p.online)
353    }
354
355    /// Check if any partition is under-replicated
356    pub fn is_under_replicated(&self) -> bool {
357        self.partitions.iter().any(|p| p.under_replicated)
358    }
359
360    /// Get offline partitions
361    pub fn offline_partitions(&self) -> Vec<u32> {
362        self.partitions
363            .iter()
364            .enumerate()
365            .filter(|(_, p)| !p.online)
366            .map(|(i, _)| i as u32)
367            .collect()
368    }
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374
375    #[test]
376    fn test_partition_leader_election() {
377        let id = PartitionId::new("test-topic", 0);
378        let replicas = vec![
379            "node-1".to_string(),
380            "node-2".to_string(),
381            "node-3".to_string(),
382        ];
383        let mut partition = PartitionState::new(id, replicas);
384
385        // Initially no leader
386        assert!(partition.leader.is_none());
387        assert!(!partition.online);
388
389        // Elect leader
390        let leader = partition.elect_leader();
391        assert!(leader.is_some());
392        assert_eq!(leader.unwrap(), "node-1"); // Preferred leader
393        assert!(partition.online);
394        assert_eq!(partition.leader_epoch, 1);
395    }
396
397    #[test]
398    fn test_isr_management() {
399        let id = PartitionId::new("test-topic", 0);
400        let replicas = vec![
401            "node-1".to_string(),
402            "node-2".to_string(),
403            "node-3".to_string(),
404        ];
405        let mut partition = PartitionState::new(id, replicas);
406        partition.elect_leader();
407
408        // All in ISR initially
409        assert_eq!(partition.isr.len(), 3);
410        assert!(!partition.under_replicated);
411
412        // Remove node-2 from ISR
413        partition.remove_from_isr(&"node-2".to_string());
414        assert_eq!(partition.isr.len(), 2);
415        assert!(partition.under_replicated);
416
417        // Add back
418        partition.add_to_isr(&"node-2".to_string());
419        assert_eq!(partition.isr.len(), 3);
420        assert!(!partition.under_replicated);
421    }
422
423    #[test]
424    fn test_high_watermark_advancement() {
425        let id = PartitionId::new("test-topic", 0);
426        let replicas = vec!["node-1".to_string(), "node-2".to_string()];
427        let mut partition = PartitionState::new(id, replicas);
428        partition.elect_leader();
429
430        // Update replica offsets
431        partition.update_replica_offset(&"node-1".to_string(), 100);
432        partition.update_replica_offset(&"node-2".to_string(), 80);
433
434        // HWM should be min of ISR
435        partition.advance_high_watermark();
436        assert_eq!(partition.high_watermark, 80);
437
438        // node-2 catches up
439        partition.update_replica_offset(&"node-2".to_string(), 100);
440        partition.advance_high_watermark();
441        assert_eq!(partition.high_watermark, 100);
442    }
443}