mqdb_core/partition/
map.rs1use super::{Epoch, NUM_PARTITIONS, NodeId, PartitionId};
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum PartitionRole {
8 None,
9 Primary,
10 Replica,
11}
12
13#[derive(Debug, Clone, PartialEq)]
14pub struct PartitionAssignment {
15 pub primary: Option<NodeId>,
16 pub replicas: Vec<NodeId>,
17 pub epoch: Epoch,
18}
19
20impl Default for PartitionAssignment {
21 fn default() -> Self {
22 Self {
23 primary: None,
24 replicas: Vec::new(),
25 epoch: Epoch::ZERO,
26 }
27 }
28}
29
30impl PartitionAssignment {
31 #[allow(clippy::must_use_candidate)]
32 pub fn new(primary: NodeId, replicas: Vec<NodeId>, epoch: Epoch) -> Self {
33 Self {
34 primary: Some(primary),
35 replicas,
36 epoch,
37 }
38 }
39
40 #[must_use]
41 pub fn role_for(&self, node: NodeId) -> PartitionRole {
42 if self.primary == Some(node) {
43 PartitionRole::Primary
44 } else if self.replicas.contains(&node) {
45 PartitionRole::Replica
46 } else {
47 PartitionRole::None
48 }
49 }
50
51 #[must_use]
52 pub fn all_nodes(&self) -> Vec<NodeId> {
53 let mut nodes = self.replicas.clone();
54 if let Some(primary) = self.primary {
55 nodes.insert(0, primary);
56 }
57 nodes
58 }
59}
60
61#[derive(Debug, Clone, PartialEq)]
62pub struct PartitionMap {
63 version: u64,
64 assignments: [PartitionAssignment; NUM_PARTITIONS as usize],
65}
66
67impl Default for PartitionMap {
68 fn default() -> Self {
69 Self::new()
70 }
71}
72
73impl PartitionMap {
74 #[allow(clippy::must_use_candidate)]
75 pub fn new() -> Self {
76 Self {
77 version: 0,
78 assignments: std::array::from_fn(|_| PartitionAssignment::default()),
79 }
80 }
81
82 #[must_use]
83 pub fn version(&self) -> u64 {
84 self.version
85 }
86
87 #[must_use]
88 pub fn get(&self, partition: PartitionId) -> &PartitionAssignment {
89 &self.assignments[partition.get() as usize]
90 }
91
92 pub fn set(&mut self, partition: PartitionId, assignment: PartitionAssignment) {
93 self.assignments[partition.get() as usize] = assignment;
94 self.version += 1;
95 }
96
97 #[must_use]
98 pub fn primary(&self, partition: PartitionId) -> Option<NodeId> {
99 self.get(partition).primary
100 }
101
102 #[must_use]
103 pub fn replicas(&self, partition: PartitionId) -> &[NodeId] {
104 &self.get(partition).replicas
105 }
106
107 #[must_use]
108 pub fn epoch(&self, partition: PartitionId) -> Epoch {
109 self.get(partition).epoch
110 }
111
112 #[must_use]
113 pub fn role_for(&self, partition: PartitionId, node: NodeId) -> PartitionRole {
114 self.get(partition).role_for(node)
115 }
116
117 #[must_use]
118 pub fn partitions_for_node(&self, node: NodeId) -> Vec<(PartitionId, PartitionRole)> {
119 PartitionId::all()
120 .filter_map(|p| {
121 let role = self.role_for(p, node);
122 if role == PartitionRole::None {
123 None
124 } else {
125 Some((p, role))
126 }
127 })
128 .collect()
129 }
130
131 #[must_use]
132 pub fn primary_count(&self, node: NodeId) -> usize {
133 PartitionId::all()
134 .filter(|p| self.primary(*p) == Some(node))
135 .count()
136 }
137
138 #[must_use]
139 pub fn replica_count(&self, node: NodeId) -> usize {
140 PartitionId::all()
141 .filter(|p| self.replicas(*p).contains(&node))
142 .count()
143 }
144
145 #[must_use]
146 pub fn has_any_assignment(&self, node: NodeId) -> bool {
147 PartitionId::all().any(|p| self.role_for(p, node) != PartitionRole::None)
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154
155 fn node(id: u16) -> NodeId {
156 NodeId::validated(id).unwrap()
157 }
158
159 fn partition(id: u16) -> PartitionId {
160 PartitionId::new(id).unwrap()
161 }
162
163 #[test]
164 fn partition_map_default_empty() {
165 let map = PartitionMap::new();
166 assert_eq!(map.version(), 0);
167 assert!(map.primary(partition(0)).is_none());
168 }
169
170 #[test]
171 fn partition_assignment() {
172 let mut map = PartitionMap::new();
173
174 let assignment = PartitionAssignment::new(node(1), vec![node(2), node(3)], Epoch::new(1));
175
176 map.set(partition(0), assignment);
177
178 assert_eq!(map.primary(partition(0)), Some(node(1)));
179 assert_eq!(map.replicas(partition(0)), &[node(2), node(3)]);
180 assert_eq!(map.epoch(partition(0)), Epoch::new(1));
181 assert_eq!(map.version(), 1);
182 }
183
184 #[test]
185 fn role_for_node() {
186 let mut map = PartitionMap::new();
187
188 let assignment = PartitionAssignment::new(node(1), vec![node(2), node(3)], Epoch::new(1));
189 map.set(partition(0), assignment);
190
191 assert_eq!(map.role_for(partition(0), node(1)), PartitionRole::Primary);
192 assert_eq!(map.role_for(partition(0), node(2)), PartitionRole::Replica);
193 assert_eq!(map.role_for(partition(0), node(3)), PartitionRole::Replica);
194 assert_eq!(map.role_for(partition(0), node(4)), PartitionRole::None);
195 }
196
197 #[test]
198 fn partitions_for_node() {
199 let mut map = PartitionMap::new();
200
201 map.set(
202 partition(0),
203 PartitionAssignment::new(node(1), vec![node(2)], Epoch::new(1)),
204 );
205 map.set(
206 partition(1),
207 PartitionAssignment::new(node(2), vec![node(1)], Epoch::new(1)),
208 );
209 map.set(
210 partition(2),
211 PartitionAssignment::new(node(1), vec![node(3)], Epoch::new(1)),
212 );
213
214 let node1_partitions = map.partitions_for_node(node(1));
215 assert_eq!(node1_partitions.len(), 3);
216 assert_eq!(map.primary_count(node(1)), 2);
217 assert_eq!(map.replica_count(node(1)), 1);
218 }
219}