1#[derive(
7 Debug,
8 Clone,
9 PartialEq,
10 Eq,
11 serde::Serialize,
12 serde::Deserialize,
13 zerompk::ToMessagePack,
14 zerompk::FromMessagePack,
15)]
16pub struct HardState {
17 pub current_term: u64,
19 pub voted_for: u64,
21}
22
23impl HardState {
24 pub fn new() -> Self {
25 Self {
26 current_term: 0,
27 voted_for: 0,
28 }
29 }
30}
31
32impl Default for HardState {
33 fn default() -> Self {
34 Self::new()
35 }
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum NodeRole {
41 Follower,
42 Candidate,
43 Leader,
44 Learner,
46 Observer,
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum PeerRole {
62 Voter,
65 Observer,
68}
69
70#[derive(Debug, Clone)]
72pub struct VolatileState {
73 pub commit_index: u64,
75 pub last_applied: u64,
77}
78
79impl VolatileState {
80 pub fn new() -> Self {
81 Self {
82 commit_index: 0,
83 last_applied: 0,
84 }
85 }
86}
87
88impl Default for VolatileState {
89 fn default() -> Self {
90 Self::new()
91 }
92}
93
94#[derive(Debug, Clone)]
101pub struct ObserverState {
102 pub next_index: u64,
104 pub match_index: u64,
106 pub pending_count: u32,
109}
110
111impl ObserverState {
112 pub const MAX_PENDING: u32 = 256;
117}
118
119#[derive(Debug, Clone)]
121pub struct LeaderState {
122 pub next_index: Vec<(u64, u64)>,
124 pub match_index: Vec<(u64, u64)>,
126 pub observer_states: Vec<(u64, ObserverState)>,
129}
130
131impl LeaderState {
132 pub fn new(peers: &[u64], observers: &[u64], last_log_index: u64) -> Self {
134 Self {
135 next_index: peers.iter().map(|&id| (id, last_log_index + 1)).collect(),
136 match_index: peers.iter().map(|&id| (id, 0)).collect(),
137 observer_states: observers
138 .iter()
139 .map(|&id| {
140 (
141 id,
142 ObserverState {
143 next_index: last_log_index + 1,
144 match_index: 0,
145 pending_count: 0,
146 },
147 )
148 })
149 .collect(),
150 }
151 }
152
153 pub fn next_index_for(&self, peer: u64) -> u64 {
154 self.next_index
155 .iter()
156 .find(|&&(id, _)| id == peer)
157 .map(|&(_, idx)| idx)
158 .unwrap_or(1)
159 }
160
161 pub fn set_next_index(&mut self, peer: u64, index: u64) {
162 if let Some(entry) = self.next_index.iter_mut().find(|e| e.0 == peer) {
163 entry.1 = index;
164 }
165 }
166
167 pub fn match_index_for(&self, peer: u64) -> u64 {
168 self.match_index
169 .iter()
170 .find(|&&(id, _)| id == peer)
171 .map(|&(_, idx)| idx)
172 .unwrap_or(0)
173 }
174
175 pub fn set_match_index(&mut self, peer: u64, index: u64) {
176 if let Some(entry) = self.match_index.iter_mut().find(|e| e.0 == peer) {
177 entry.1 = index;
178 }
179 }
180
181 pub fn add_peer(&mut self, peer: u64, last_log_index: u64) {
183 if !self.next_index.iter().any(|&(id, _)| id == peer) {
184 self.next_index.push((peer, last_log_index + 1));
185 self.match_index.push((peer, 0));
186 }
187 }
188
189 pub fn remove_peer(&mut self, peer: u64) {
191 self.next_index.retain(|&(id, _)| id != peer);
192 self.match_index.retain(|&(id, _)| id != peer);
193 }
194
195 pub fn peers(&self) -> Vec<u64> {
197 self.next_index.iter().map(|&(id, _)| id).collect()
198 }
199
200 pub fn add_observer(&mut self, observer: u64, last_log_index: u64) {
202 if !self.observer_states.iter().any(|&(id, _)| id == observer) {
203 self.observer_states.push((
204 observer,
205 ObserverState {
206 next_index: last_log_index + 1,
207 match_index: 0,
208 pending_count: 0,
209 },
210 ));
211 }
212 }
213
214 pub fn remove_observer(&mut self, observer: u64) {
216 self.observer_states.retain(|&(id, _)| id != observer);
217 }
218
219 pub fn observer_state_mut(&mut self, observer: u64) -> Option<&mut ObserverState> {
221 self.observer_states
222 .iter_mut()
223 .find(|(id, _)| *id == observer)
224 .map(|(_, state)| state)
225 }
226
227 pub fn observer_can_receive(&self, observer: u64) -> bool {
230 self.observer_states
231 .iter()
232 .find(|(id, _)| *id == observer)
233 .map(|(_, state)| state.pending_count < ObserverState::MAX_PENDING)
234 .unwrap_or(false)
235 }
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241
242 #[test]
243 fn hard_state_default() {
244 let hs = HardState::new();
245 assert_eq!(hs.current_term, 0);
246 assert_eq!(hs.voted_for, 0);
247 }
248
249 #[test]
250 fn leader_state_initialization() {
251 let peers = vec![2, 3, 4];
252 let ls = LeaderState::new(&peers, &[], 10);
253 assert_eq!(ls.next_index_for(2), 11);
254 assert_eq!(ls.next_index_for(3), 11);
255 assert_eq!(ls.match_index_for(2), 0);
256 }
257
258 #[test]
259 fn leader_state_update() {
260 let peers = vec![2, 3];
261 let mut ls = LeaderState::new(&peers, &[], 5);
262 ls.set_next_index(2, 8);
263 ls.set_match_index(2, 7);
264 assert_eq!(ls.next_index_for(2), 8);
265 assert_eq!(ls.match_index_for(2), 7);
266 assert_eq!(ls.next_index_for(3), 6);
268 }
269
270 #[test]
271 fn node_role_equality() {
272 assert_eq!(NodeRole::Follower, NodeRole::Follower);
273 assert_ne!(NodeRole::Follower, NodeRole::Leader);
274 }
275}