Skip to main content

mqdb_core/partition/
map.rs

1// Copyright 2025-2026 LabOverWire. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{Epoch, NUM_PARTITIONS, NodeId, PartitionId};
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum PartitionRole {
8    None,
9    Primary,
10    Replica,
11}
12
13#[derive(Debug, Clone, PartialEq)]
14pub struct PartitionAssignment {
15    pub primary: Option<NodeId>,
16    pub replicas: Vec<NodeId>,
17    pub epoch: Epoch,
18}
19
20impl Default for PartitionAssignment {
21    fn default() -> Self {
22        Self {
23            primary: None,
24            replicas: Vec::new(),
25            epoch: Epoch::ZERO,
26        }
27    }
28}
29
30impl PartitionAssignment {
31    #[allow(clippy::must_use_candidate)]
32    pub fn new(primary: NodeId, replicas: Vec<NodeId>, epoch: Epoch) -> Self {
33        Self {
34            primary: Some(primary),
35            replicas,
36            epoch,
37        }
38    }
39
40    #[must_use]
41    pub fn role_for(&self, node: NodeId) -> PartitionRole {
42        if self.primary == Some(node) {
43            PartitionRole::Primary
44        } else if self.replicas.contains(&node) {
45            PartitionRole::Replica
46        } else {
47            PartitionRole::None
48        }
49    }
50
51    #[must_use]
52    pub fn all_nodes(&self) -> Vec<NodeId> {
53        let mut nodes = self.replicas.clone();
54        if let Some(primary) = self.primary {
55            nodes.insert(0, primary);
56        }
57        nodes
58    }
59}
60
61#[derive(Debug, Clone, PartialEq)]
62pub struct PartitionMap {
63    version: u64,
64    assignments: [PartitionAssignment; NUM_PARTITIONS as usize],
65}
66
67impl Default for PartitionMap {
68    fn default() -> Self {
69        Self::new()
70    }
71}
72
73impl PartitionMap {
74    #[allow(clippy::must_use_candidate)]
75    pub fn new() -> Self {
76        Self {
77            version: 0,
78            assignments: std::array::from_fn(|_| PartitionAssignment::default()),
79        }
80    }
81
82    #[must_use]
83    pub fn version(&self) -> u64 {
84        self.version
85    }
86
87    #[must_use]
88    pub fn get(&self, partition: PartitionId) -> &PartitionAssignment {
89        &self.assignments[partition.get() as usize]
90    }
91
92    pub fn set(&mut self, partition: PartitionId, assignment: PartitionAssignment) {
93        self.assignments[partition.get() as usize] = assignment;
94        self.version += 1;
95    }
96
97    #[must_use]
98    pub fn primary(&self, partition: PartitionId) -> Option<NodeId> {
99        self.get(partition).primary
100    }
101
102    #[must_use]
103    pub fn replicas(&self, partition: PartitionId) -> &[NodeId] {
104        &self.get(partition).replicas
105    }
106
107    #[must_use]
108    pub fn epoch(&self, partition: PartitionId) -> Epoch {
109        self.get(partition).epoch
110    }
111
112    #[must_use]
113    pub fn role_for(&self, partition: PartitionId, node: NodeId) -> PartitionRole {
114        self.get(partition).role_for(node)
115    }
116
117    #[must_use]
118    pub fn partitions_for_node(&self, node: NodeId) -> Vec<(PartitionId, PartitionRole)> {
119        PartitionId::all()
120            .filter_map(|p| {
121                let role = self.role_for(p, node);
122                if role == PartitionRole::None {
123                    None
124                } else {
125                    Some((p, role))
126                }
127            })
128            .collect()
129    }
130
131    #[must_use]
132    pub fn primary_count(&self, node: NodeId) -> usize {
133        PartitionId::all()
134            .filter(|p| self.primary(*p) == Some(node))
135            .count()
136    }
137
138    #[must_use]
139    pub fn replica_count(&self, node: NodeId) -> usize {
140        PartitionId::all()
141            .filter(|p| self.replicas(*p).contains(&node))
142            .count()
143    }
144
145    #[must_use]
146    pub fn has_any_assignment(&self, node: NodeId) -> bool {
147        PartitionId::all().any(|p| self.role_for(p, node) != PartitionRole::None)
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154
155    fn node(id: u16) -> NodeId {
156        NodeId::validated(id).unwrap()
157    }
158
159    fn partition(id: u16) -> PartitionId {
160        PartitionId::new(id).unwrap()
161    }
162
163    #[test]
164    fn partition_map_default_empty() {
165        let map = PartitionMap::new();
166        assert_eq!(map.version(), 0);
167        assert!(map.primary(partition(0)).is_none());
168    }
169
170    #[test]
171    fn partition_assignment() {
172        let mut map = PartitionMap::new();
173
174        let assignment = PartitionAssignment::new(node(1), vec![node(2), node(3)], Epoch::new(1));
175
176        map.set(partition(0), assignment);
177
178        assert_eq!(map.primary(partition(0)), Some(node(1)));
179        assert_eq!(map.replicas(partition(0)), &[node(2), node(3)]);
180        assert_eq!(map.epoch(partition(0)), Epoch::new(1));
181        assert_eq!(map.version(), 1);
182    }
183
184    #[test]
185    fn role_for_node() {
186        let mut map = PartitionMap::new();
187
188        let assignment = PartitionAssignment::new(node(1), vec![node(2), node(3)], Epoch::new(1));
189        map.set(partition(0), assignment);
190
191        assert_eq!(map.role_for(partition(0), node(1)), PartitionRole::Primary);
192        assert_eq!(map.role_for(partition(0), node(2)), PartitionRole::Replica);
193        assert_eq!(map.role_for(partition(0), node(3)), PartitionRole::Replica);
194        assert_eq!(map.role_for(partition(0), node(4)), PartitionRole::None);
195    }
196
197    #[test]
198    fn partitions_for_node() {
199        let mut map = PartitionMap::new();
200
201        map.set(
202            partition(0),
203            PartitionAssignment::new(node(1), vec![node(2)], Epoch::new(1)),
204        );
205        map.set(
206            partition(1),
207            PartitionAssignment::new(node(2), vec![node(1)], Epoch::new(1)),
208        );
209        map.set(
210            partition(2),
211            PartitionAssignment::new(node(1), vec![node(3)], Epoch::new(1)),
212        );
213
214        let node1_partitions = map.partitions_for_node(node(1));
215        assert_eq!(node1_partitions.len(), 3);
216        assert_eq!(map.primary_count(node(1)), 2);
217        assert_eq!(map.replica_count(node(1)), 1);
218    }
219}