1use crate::types::{FencingToken, LogIndex, NodeId, NodeState, Term};
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, Ordering};
6use tracing::debug;
7
8#[derive(Debug, Clone)]
10pub struct PersistentState {
11 pub current_term: Term,
13 pub voted_for: Option<NodeId>,
15}
16
17impl PersistentState {
18 pub fn new() -> Self {
20 Self {
21 current_term: 0,
22 voted_for: None,
23 }
24 }
25
26 pub fn update_term(&mut self, new_term: Term) {
28 if new_term > self.current_term {
29 debug!(
30 old_term = self.current_term,
31 new_term = new_term,
32 "Persistent state: term updated, cleared voted_for"
33 );
34 self.current_term = new_term;
35 self.voted_for = None;
36 }
37 }
38
39 pub fn grant_vote(&mut self, candidate_id: NodeId) {
41 debug!(
42 candidate_id = candidate_id,
43 term = self.current_term,
44 "Persistent state: vote granted"
45 );
46 self.voted_for = Some(candidate_id);
47 }
48}
49
50impl Default for PersistentState {
51 fn default() -> Self {
52 Self::new()
53 }
54}
55
56pub struct FencingTokenState {
63 current_token: AtomicU64,
64}
65
66impl FencingTokenState {
67 pub fn new() -> Self {
69 Self {
70 current_token: AtomicU64::new(0),
71 }
72 }
73
74 pub fn issue_token(&self) -> FencingToken {
78 let raw = self.current_token.fetch_add(1, Ordering::SeqCst);
79 FencingToken(raw)
80 }
81
82 pub fn bump_term_token(&self, new_term: u32) {
86 let token = FencingToken::new_leader_term(new_term);
87 self.current_token.store(token.raw(), Ordering::SeqCst);
88 }
89
90 pub fn current_raw(&self) -> u64 {
92 self.current_token.load(Ordering::SeqCst)
93 }
94}
95
96impl Default for FencingTokenState {
97 fn default() -> Self {
98 Self::new()
99 }
100}
101
102impl std::fmt::Debug for FencingTokenState {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 let raw = self.current_token.load(Ordering::Relaxed);
105 let token = FencingToken(raw);
106 f.debug_struct("FencingTokenState")
107 .field("term", &token.term())
108 .field("seq", &token.seq())
109 .finish()
110 }
111}
112
113#[derive(Debug, Clone)]
115pub struct VolatileState {
116 pub node_state: NodeState,
118 pub leader_id: Option<NodeId>,
120}
121
122impl VolatileState {
123 pub fn new() -> Self {
125 Self {
126 node_state: NodeState::Follower,
127 leader_id: None,
128 }
129 }
130
131 pub fn become_follower(&mut self, leader_id: Option<NodeId>) {
133 self.node_state = NodeState::Follower;
134 self.leader_id = leader_id;
135 }
136
137 pub fn become_candidate(&mut self) {
139 self.node_state = NodeState::Candidate;
140 self.leader_id = None;
141 }
142
143 pub fn become_leader(&mut self) {
145 self.node_state = NodeState::Leader;
146 self.leader_id = None;
147 }
148
149 pub fn is_leader(&self) -> bool {
151 self.node_state == NodeState::Leader
152 }
153
154 pub fn is_candidate(&self) -> bool {
156 self.node_state == NodeState::Candidate
157 }
158
159 pub fn is_follower(&self) -> bool {
161 self.node_state == NodeState::Follower
162 }
163}
164
165impl Default for VolatileState {
166 fn default() -> Self {
167 Self::new()
168 }
169}
170
171#[derive(Debug, Clone)]
173pub struct LeaderState {
174 pub next_index: HashMap<NodeId, LogIndex>,
176 pub match_index: HashMap<NodeId, LogIndex>,
178}
179
180impl LeaderState {
181 pub fn new(peers: &[NodeId], last_log_index: LogIndex) -> Self {
183 let mut next_index = HashMap::new();
184 let mut match_index = HashMap::new();
185
186 for &peer in peers {
187 next_index.insert(peer, last_log_index + 1);
188 match_index.insert(peer, 0);
189 }
190
191 Self {
192 next_index,
193 match_index,
194 }
195 }
196
197 pub fn update_success(&mut self, peer: NodeId, match_idx: LogIndex) {
199 self.match_index.insert(peer, match_idx);
200 self.next_index.insert(peer, match_idx + 1);
201 }
202
203 pub fn update_failure(&mut self, peer: NodeId) {
205 if let Some(next_idx) = self.next_index.get_mut(&peer) {
206 if *next_idx > 1 {
207 *next_idx -= 1;
208 }
209 }
210 }
211
212 pub fn update_failure_with_hint(
226 &mut self,
227 peer: NodeId,
228 conflict_index: Option<LogIndex>,
229 _conflict_term: Option<Term>,
230 follower_last_index: LogIndex,
231 ) {
232 let new_next = match conflict_index {
233 Some(ci) if ci > 0 => {
234 ci
236 }
237 _ => {
238 (follower_last_index + 1).max(1)
241 }
242 };
243
244 let clamped = new_next.max(1);
246
247 if let Some(next_idx) = self.next_index.get_mut(&peer) {
249 if clamped < *next_idx {
250 *next_idx = clamped;
251 } else {
252 if *next_idx > 1 {
254 *next_idx -= 1;
255 }
256 }
257 } else {
258 self.next_index.insert(peer, clamped);
259 }
260 }
261
262 pub fn calculate_commit_index_joint(
268 &self,
269 leader_id: NodeId,
270 current_last_index: LogIndex,
271 config_state: &crate::types::ConfigState,
272 ) -> LogIndex {
273 match config_state {
274 crate::types::ConfigState::Stable(config) => {
275 let quorum = config.quorum_size();
276 self.calculate_commit_index(current_last_index, quorum)
277 }
278 crate::types::ConfigState::Joint { old, new } => {
279 let old_commit = Self::quorum_index_for_config(
282 old,
283 leader_id,
284 current_last_index,
285 &self.match_index,
286 );
287 let new_commit = Self::quorum_index_for_config(
288 new,
289 leader_id,
290 current_last_index,
291 &self.match_index,
292 );
293
294 old_commit.min(new_commit)
296 }
297 }
298 }
299
300 fn quorum_index_for_config(
302 config: &crate::types::ClusterConfig,
303 leader_id: NodeId,
304 leader_last_index: LogIndex,
305 match_index: &HashMap<NodeId, LogIndex>,
306 ) -> LogIndex {
307 let member_ids = config.member_ids();
308 let quorum = config.quorum_size();
309
310 let mut indices: Vec<LogIndex> = member_ids
312 .iter()
313 .map(|&id| {
314 if id == leader_id {
315 leader_last_index
316 } else {
317 match_index.get(&id).copied().unwrap_or(0)
318 }
319 })
320 .collect();
321
322 indices.sort_unstable();
323 indices.reverse();
324
325 if indices.len() >= quorum && quorum > 0 {
328 indices[quorum - 1]
329 } else {
330 0
331 }
332 }
333
334 pub fn get_next_index(&self, peer: NodeId) -> LogIndex {
336 self.next_index.get(&peer).copied().unwrap_or(1)
337 }
338
339 pub fn get_match_index(&self, peer: NodeId) -> LogIndex {
341 self.match_index.get(&peer).copied().unwrap_or(0)
342 }
343
344 pub fn calculate_commit_index(&self, current_index: LogIndex, quorum_size: usize) -> LogIndex {
347 let mut indices: Vec<LogIndex> = self.match_index.values().copied().collect();
349 indices.sort_unstable();
350 indices.reverse();
351
352 if indices.len() + 1 >= quorum_size {
355 let quorum_idx = quorum_size.saturating_sub(2);
356 if quorum_idx < indices.len() {
357 return indices[quorum_idx].min(current_index);
358 }
359 }
360
361 0
362 }
363}
364
365#[derive(Debug, Clone)]
367pub struct CandidateState {
368 pub votes_received: Vec<NodeId>,
370}
371
372impl CandidateState {
373 pub fn new(self_id: NodeId) -> Self {
375 Self {
376 votes_received: vec![self_id],
377 }
378 }
379
380 pub fn record_vote(&mut self, peer: NodeId) {
382 if !self.votes_received.contains(&peer) {
383 self.votes_received.push(peer);
384 }
385 }
386
387 pub fn has_quorum(&self, quorum_size: usize) -> bool {
389 self.votes_received.len() >= quorum_size
390 }
391
392 pub fn vote_count(&self) -> usize {
394 self.votes_received.len()
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401
402 #[test]
403 fn test_persistent_state_new() {
404 let state = PersistentState::new();
405 assert_eq!(state.current_term, 0);
406 assert_eq!(state.voted_for, None);
407 }
408
409 #[test]
410 fn test_persistent_state_update_term() {
411 let mut state = PersistentState::new();
412 state.voted_for = Some(1);
413 state.update_term(5);
414
415 assert_eq!(state.current_term, 5);
416 assert_eq!(state.voted_for, None);
417 }
418
419 #[test]
420 fn test_persistent_state_grant_vote() {
421 let mut state = PersistentState::new();
422 state.grant_vote(2);
423 assert_eq!(state.voted_for, Some(2));
424 }
425
426 #[test]
427 fn test_volatile_state_new() {
428 let state = VolatileState::new();
429 assert_eq!(state.node_state, NodeState::Follower);
430 assert_eq!(state.leader_id, None);
431 }
432
433 #[test]
434 fn test_volatile_state_transitions() {
435 let mut state = VolatileState::new();
436
437 state.become_candidate();
438 assert!(state.is_candidate());
439 assert_eq!(state.leader_id, None);
440
441 state.become_leader();
442 assert!(state.is_leader());
443 assert_eq!(state.leader_id, None);
444
445 state.become_follower(Some(5));
446 assert!(state.is_follower());
447 assert_eq!(state.leader_id, Some(5));
448 }
449
450 #[test]
451 fn test_leader_state_new() {
452 let peers = vec![1, 2, 3];
453 let leader_state = LeaderState::new(&peers, 10);
454
455 assert_eq!(leader_state.get_next_index(1), 11);
456 assert_eq!(leader_state.get_match_index(1), 0);
457 }
458
459 #[test]
460 fn test_leader_state_update_success() {
461 let peers = vec![1, 2, 3];
462 let mut leader_state = LeaderState::new(&peers, 10);
463
464 leader_state.update_success(1, 12);
465 assert_eq!(leader_state.get_next_index(1), 13);
466 assert_eq!(leader_state.get_match_index(1), 12);
467 }
468
469 #[test]
470 fn test_leader_state_update_failure() {
471 let peers = vec![1, 2, 3];
472 let mut leader_state = LeaderState::new(&peers, 10);
473
474 leader_state.update_failure(1);
475 assert_eq!(leader_state.get_next_index(1), 10);
476 }
477
478 #[test]
479 fn test_leader_state_calculate_commit_index() {
480 let peers = vec![2, 3, 4, 5];
481 let mut leader_state = LeaderState::new(&peers, 10);
482
483 leader_state.update_success(2, 8);
485 leader_state.update_success(3, 9);
486 leader_state.update_success(4, 7);
487 leader_state.update_success(5, 6);
488
489 let commit_idx = leader_state.calculate_commit_index(10, 3);
492 assert_eq!(commit_idx, 8);
493 }
494
495 #[test]
496 fn test_candidate_state_new() {
497 let state = CandidateState::new(1);
498 assert_eq!(state.vote_count(), 1);
499 assert!(state.votes_received.contains(&1));
500 }
501
502 #[test]
503 fn test_candidate_state_record_vote() {
504 let mut state = CandidateState::new(1);
505 state.record_vote(2);
506 state.record_vote(3);
507
508 assert_eq!(state.vote_count(), 3);
509 assert!(state.has_quorum(2));
510 }
511
512 #[test]
513 fn test_candidate_state_has_quorum() {
514 let mut state = CandidateState::new(1);
515 assert!(state.has_quorum(1));
516 assert!(!state.has_quorum(2));
517
518 state.record_vote(2);
519 assert!(state.has_quorum(2));
520 }
521
522 #[test]
525 fn test_update_failure_with_hint_jumps_to_conflict() {
526 let peers = vec![2, 3, 4];
527 let mut ls = LeaderState::new(&peers, 10);
528 ls.update_failure_with_hint(2, Some(5), Some(2), 8);
531 assert_eq!(
532 ls.get_next_index(2),
533 5,
534 "should jump back to conflict_index"
535 );
536 }
537
538 #[test]
539 fn test_update_failure_with_hint_no_hint_uses_last_index() {
540 let peers = vec![2, 3];
541 let mut ls = LeaderState::new(&peers, 10);
542
543 ls.update_failure_with_hint(2, None, None, 3);
544 assert_eq!(
545 ls.get_next_index(2),
546 4,
547 "should use follower_last_index + 1"
548 );
549 }
550
551 #[test]
552 fn test_update_failure_with_hint_does_not_go_forward() {
553 let peers = vec![2, 3];
554 let mut ls = LeaderState::new(&peers, 5);
555 ls.update_failure_with_hint(2, Some(10), Some(1), 9);
559 assert_eq!(ls.get_next_index(2), 5);
561 }
562
563 #[test]
566 fn test_calculate_commit_index_joint_stable() {
567 use crate::types::{ClusterConfig, ConfigState};
568
569 let peers = vec![2, 3, 4];
570 let mut ls = LeaderState::new(&peers, 10);
571 ls.update_success(2, 8);
572 ls.update_success(3, 7);
573 ls.update_success(4, 6);
574
575 let config = ConfigState::Stable(ClusterConfig::new(
578 vec![
579 (1, "a".into()),
580 (2, "b".into()),
581 (3, "c".into()),
582 (4, "d".into()),
583 (5, "e".into()),
584 ],
585 0,
586 ));
587
588 let commit = ls.calculate_commit_index_joint(1, 10, &config);
593 assert_eq!(commit, 7);
594 }
595
596 #[test]
597 fn test_calculate_commit_index_joint_consensus() {
598 use crate::types::{ClusterConfig, ConfigState};
599
600 let peers = vec![2, 3, 4];
601 let mut ls = LeaderState::new(&peers, 10);
602 ls.update_success(2, 8);
603 ls.update_success(3, 7);
604 ls.update_success(4, 9);
605
606 let old = ClusterConfig::new(vec![(1, "a".into()), (2, "b".into()), (3, "c".into())], 0);
609 let new = ClusterConfig::new(
610 vec![
611 (1, "a".into()),
612 (2, "b".into()),
613 (3, "c".into()),
614 (4, "d".into()),
615 ],
616 1,
617 );
618
619 let config = ConfigState::Joint { old, new };
620
621 let commit = ls.calculate_commit_index_joint(1, 10, &config);
625 assert_eq!(commit, 8);
626 }
627
628 #[test]
629 fn test_calculate_commit_index_joint_limited_by_old() {
630 use crate::types::{ClusterConfig, ConfigState};
631
632 let peers = vec![2, 3, 4, 5];
633 let mut ls = LeaderState::new(&peers, 10);
634 ls.update_success(2, 3); ls.update_success(3, 9); ls.update_success(4, 9); ls.update_success(5, 9); let old = ClusterConfig::new(vec![(1, "a".into()), (2, "b".into()), (3, "c".into())], 0);
642 let new = ClusterConfig::new(
643 vec![
644 (1, "a".into()),
645 (3, "c".into()),
646 (4, "d".into()),
647 (5, "e".into()),
648 ],
649 1,
650 );
651
652 let config = ConfigState::Joint { old, new };
653
654 let commit = ls.calculate_commit_index_joint(1, 10, &config);
658 assert_eq!(commit, 9);
659 }
660}