1use std::collections::{HashMap, HashSet};
18
19use crate::constant::*;
20use crate::types::{ConfigStage, Entry, EntryMeta, HardState, MemberState};
21
22use chrono::{prelude::*, DateTime};
23use log::{debug, error, info, trace, warn};
24
25#[derive(Copy, Clone, Debug, PartialEq, Eq)]
26pub enum Role {
27 Leader,
28 Student,
29 Follower,
30 Candidate,
31}
32
33impl std::fmt::Display for Role {
34 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
35 write!(f, "{:?}", self)
36 }
37}
38
39#[inline(always)]
40fn clone_last_log_meta(metas: &Vec<EntryMeta>) -> EntryMeta {
41 metas
42 .last()
43 .map(|e| e.clone())
44 .unwrap_or(EntryMeta::default())
45}
46
47#[inline(always)]
48pub fn majority(len: usize) -> usize {
49 (len + 1) / 2
50}
51
52#[derive(Debug, Eq, PartialEq)]
53pub enum SnapshotState {
54 Creating,
55 Loading,
56 None,
57}
58
59#[derive(Debug)]
60pub(crate) struct SnapshotRecevingState {
61 pub receiving: bool,
62 pub start_at: i64,
63 pub last_heartbeat_at: i64,
64}
65
66#[derive(Debug, Clone)]
67pub struct Inflights {
68 pub pos: usize,
69 pub cnt: usize,
70 pub cap: usize,
71 pub buf: Vec<u64>,
72}
73
74impl Inflights {
75 fn new(cap: usize) -> Inflights {
76 let mut buf = Vec::new();
77 buf.resize(cap, 0);
78 Inflights {
79 pos: 0,
80 cnt: 0,
81 cap,
82 buf,
83 }
84 }
85
86 fn full(&self) -> bool {
87 self.cnt == self.cap
88 }
89
90 fn reset(&mut self) {
91 self.cnt = 0;
92 self.pos = 0;
93 }
94
95 fn add(&mut self, id: u64) {
96 if self.full() {
97 return;
98 }
99
100 let next = (self.pos + self.cnt) % self.cap;
101 self.buf[next] = id;
102 self.cnt += 1;
103 }
104
105 fn free_to(&mut self, to: u64) {
106 if self.cnt == 0 || to < self.buf[self.pos] {
107 return;
109 }
110
111 let mut idx = self.pos;
112 let mut i = 0;
113 while i < self.cnt {
114 if to < self.buf[idx] {
115 break;
116 }
117 idx = (idx + 1) % self.cap;
118 i += 1;
119 }
120
121 self.cnt -= i;
122 self.pos = idx;
123 if self.cnt == 0 {
124 self.pos = 0;
125 }
126 }
127}
128
129#[derive(Debug, Clone, PartialEq, Eq)]
130pub enum ProgressState {
131 Probe,
132 Replicate,
133 MsgBuilding,
134}
135
136#[derive(Debug, Clone)]
137pub struct Progress {
138 pub node_id: u64,
139
140 pub match_id: u64,
141 pub next_id: u64,
142
143 pub granted: bool,
144 pub active: bool,
145
146 pub paused: bool,
147 pub inflights: Inflights,
148 pub state: ProgressState,
149 pub belongs_to: ConfigStage,
150}
151
152impl Progress {
153 pub fn new(node_id: u64, belongs_to: ConfigStage) -> Progress {
154 Progress {
155 node_id,
156 match_id: INVALID_ID,
157 next_id: INVALID_ID + 1,
158
159 granted: false,
160 active: false,
161
162 paused: false,
163 inflights: Inflights::new(128),
164 state: ProgressState::Probe,
165 belongs_to,
166 }
167 }
168
169 pub fn reset(&mut self, last_id: u64) {
170 self.match_id = INVALID_ID;
171 self.next_id = last_id + 1;
172 self.granted = false;
173 self.active = true;
174 self.state = ProgressState::Probe;
175 }
176
177 pub fn try_advance_matched_id(&mut self, hint_id: u64) -> bool {
178 self.paused = false;
179 if self.match_id + 1u64 < hint_id {
180 self.match_id = hint_id - 1u64;
181 if self.next_id <= self.match_id {
182 self.next_id = self.match_id + 1u64;
183 }
184 match &self.state {
185 ProgressState::Probe => {
186 self.inflights.reset();
187 self.state = ProgressState::Replicate;
188 }
189 ProgressState::Replicate => {
190 self.inflights.free_to(self.match_id);
191 }
192 _ => {}
193 }
194 true
195 } else {
196 false
197 }
198 }
199
200 pub fn try_reset_next_id(&mut self, reject_id: u64, hint_id: u64) -> bool {
201 self.paused = false;
202 match &self.state {
204 ProgressState::Replicate if reject_id > self.match_id => {
205 self.state = ProgressState::Probe;
206 self.next_id = self.match_id + 1u64;
207 true
208 }
209 ProgressState::Probe if reject_id == self.next_id => {
210 self.next_id = std::cmp::max(1, hint_id);
211 true
212 }
213 _ => false,
214 }
215 }
216
217 pub fn on_receive_msg(&mut self) {
218 self.active = true;
219 }
220
221 pub fn is_paused(&self) -> bool {
222 match &self.state {
223 ProgressState::Probe => self.paused,
224 ProgressState::MsgBuilding => true,
225 ProgressState::Replicate => self.inflights.full(),
226 }
227 }
228
229 pub fn step_building_msg(&mut self) {
230 self.state = ProgressState::MsgBuilding;
231 }
232
233 pub fn step_builded(&mut self, hint_id: u64) {
234 self.state = ProgressState::Probe;
235 self.paused = false;
236 if self.match_id <= hint_id {
237 self.next_id = hint_id;
238 }
239 }
240
241 pub fn replicate_to(&mut self, last_id: u64) {
242 if last_id <= self.match_id {
243 return;
244 }
245 match &self.state {
246 ProgressState::Probe => {
247 self.paused = true;
248 }
249 ProgressState::Replicate => {
250 self.inflights.add(last_id);
251 self.next_id = last_id + 1u64;
252 }
253 _ => {}
254 }
255 }
256
257 pub fn belongs_to_stage(&self, stage: ConfigStage) -> bool {
258 self.belongs_to == stage || self.belongs_to == ConfigStage::Both
259 }
260}
261
262#[derive(Debug)]
263pub struct ChannelDesc {
264 pub channel_id: u64,
265 pub committed_id: u64,
266 pub last_id: u64,
267 pub hard_state: HardState,
268 pub members: HashMap<u64, MemberState>,
269}
270
271#[derive(Debug, Clone)]
272pub struct ChannelInfo {
273 pub local_id: u64,
274 pub channel_id: u64,
275
276 pub progress_map: HashMap<u64, Progress>,
277
278 pub voted_for: u64,
279 pub term: u64,
280 pub committed_id: u64,
281 pub prev_term_last_id: u64,
282
283 pub leader_id: u64,
284 pub role: Role,
285
286 pub last_liveness_at: i64,
287 pub elapsed_tick: u32,
288
289 pub max_received_entries: Vec<EntryMeta>,
291
292 pub missed_voters: HashSet<u64>,
294 pub learned_voters: HashSet<u64>,
295}
296
297impl ChannelInfo {
298 pub fn build(local_id: u64, channel_id: u64, descs: HashMap<u64, MemberState>) -> ChannelInfo {
299 let desc = ChannelDesc {
300 channel_id,
301 committed_id: INVALID_ID,
302 last_id: INVALID_ID,
303 hard_state: HardState::default(),
304 members: descs,
305 };
306 ChannelInfo::new(local_id, &desc)
307 }
308
309 pub fn new(local_id: u64, channel_desc: &ChannelDesc) -> ChannelInfo {
310 let mut progress_map = channel_desc
311 .members
312 .iter()
313 .map(|(id, desc)| (*id, Progress::new(*id, desc.stage)))
314 .collect::<HashMap<u64, Progress>>();
315
316 let mut initial_local_matched_id = 0;
317 for (id, p) in &mut progress_map {
318 p.reset(channel_desc.last_id);
319 if *id == local_id {
322 p.match_id = channel_desc.last_id;
323 initial_local_matched_id = channel_desc.last_id;
324 }
325 }
326
327 info!(
328 "node {} setup channel {}, voted_for {}, term {}, committed_id {}, last_id {}, matched_id {}",
329 local_id,
330 channel_desc.channel_id,
331 channel_desc.hard_state.voted_for,
332 channel_desc.hard_state.current_term,
333 channel_desc.committed_id,
334 channel_desc.last_id,
335 initial_local_matched_id
336 );
337
338 ChannelInfo {
339 local_id,
340 channel_id: channel_desc.channel_id,
341 progress_map,
342 voted_for: channel_desc.hard_state.voted_for,
343 term: channel_desc.hard_state.current_term,
344 committed_id: channel_desc.committed_id,
345 prev_term_last_id: channel_desc.last_id,
346 leader_id: INVALID_NODE_ID,
347 role: Role::Follower,
348 elapsed_tick: 0,
349 last_liveness_at: Local::now().timestamp_nanos(),
350 max_received_entries: Vec::new(),
351 missed_voters: HashSet::new(),
352 learned_voters: HashSet::new(),
353 }
354 }
355
356 pub fn current_term(&self) -> u64 {
357 return self.term;
358 }
359
360 pub fn last_learned_entry_meta(&self) -> EntryMeta {
361 clone_last_log_meta(&self.max_received_entries)
362 }
363
364 pub fn try_receive_prepare_entries(&mut self, entries: Vec<EntryMeta>) -> bool {
365 let local_last = clone_last_log_meta(&self.max_received_entries);
366 let remote_last = clone_last_log_meta(&entries);
367 if remote_last.term > local_last.term
368 || (remote_last.term == local_last.term && local_last.id < remote_last.id)
369 {
370 debug!(
371 "node {} channel {} receive prepare entries last id {}, term: {}",
372 self.local_id, self.channel_id, remote_last.id, remote_last.term
373 );
374 self.max_received_entries = entries;
375 true
376 } else {
377 false
378 }
379 }
380
381 pub fn reset_tick(&mut self) {
382 self.elapsed_tick = 0;
387 }
388
389 pub fn on_receive_msg(&mut self, from: u64) {
390 if let Some(p) = self.progress_map.get_mut(&from) {
395 p.on_receive_msg();
396 }
397 }
398
399 pub fn advance_quorum_lease(&mut self) -> usize {
400 let mut count = 0;
401 for (_, p) in &mut self.progress_map {
402 if p.active {
403 count += 1;
404 p.active = false;
405 }
406 }
407 count
408 }
409
410 fn reset(&mut self, last_id: u64) {
411 let local_id = self.local_id;
412 self.progress_map.iter_mut().for_each(|(id, progress)| {
413 let last_match_id = progress.match_id;
414 progress.reset(last_id);
415 if *id == local_id {
416 progress.match_id = std::cmp::max(last_match_id, last_id);
417 }
418 });
419 }
420
421 pub fn to_leader(&mut self, last_id: u64, reason: &'static str) {
422 let _prev_role = self.to(Role::Leader, reason);
423 self.leader_id = self.local_id;
424 self.reset(last_id);
425 }
426
427 pub fn to_student(&mut self, last_id: u64, reason: &'static str) {
428 let _prev_role = self.to(Role::Student, reason);
429 self.leader_id = self.local_id;
430 self.missed_voters = self
431 .progress_map
432 .iter()
433 .filter(|(_, p)| !p.granted)
434 .map(|(id, _)| *id)
435 .collect::<HashSet<_>>();
436 self.learned_voters = HashSet::new();
437 self.max_received_entries.clear();
438 self.reset(last_id);
439 }
440
441 pub fn to_candidate(&mut self, last_id: u64, reason: &'static str) {
442 let _prev_role = self.to(Role::Candidate, reason);
443 self.term += 1;
444 self.leader_id = INVALID_NODE_ID;
445 self.prev_term_last_id = last_id;
446 self.voted_for = self.local_id;
447 self.max_received_entries.clear();
448 self.reset(INVALID_ID);
449 }
450
451 pub fn to_follower(
452 &mut self,
453 leader_id: u64,
454 target_term: u64,
455 last_id: u64,
456 reason: &'static str,
457 ) {
458 let _prev_role = self.to(Role::Follower, reason);
459 if self.term < target_term {
460 info!(
461 "node {} channel {} advance term from {} to {}",
462 self.local_id, self.channel_id, self.term, target_term
463 );
464 self.term = target_term;
465 self.voted_for = INVALID_NODE_ID;
466 self.prev_term_last_id = last_id;
467 }
468 if self.leader_id != leader_id {
469 info!(
470 "node {} channel {} follow leader {} from {} at term {}",
471 self.local_id, self.channel_id, leader_id, self.leader_id, self.term
472 );
473 self.leader_id = leader_id;
474 }
475 self.reset(INVALID_ID);
476 }
477
478 fn to(&mut self, target_role: Role, reason: &'static str) -> Role {
479 info!(
480 "node {} channel {} change role from {} to {} at term {}, reason: {}",
481 self.local_id, self.channel_id, self.role, target_role, self.term, reason
482 );
483 let prev_role = self.role;
484 self.role = target_role;
485 self.elapsed_tick = 0;
486 prev_role
487 }
488
489 pub fn is_already_promise_others(&self, node_id: u64) -> bool {
490 self.voted_for != INVALID_NODE_ID && self.voted_for != node_id
492 }
493
494 pub fn try_make_promise(&mut self, node_id: u64) -> bool {
495 if !self.is_already_promise_others(node_id) {
496 debug!(
497 "node {} channel {} take promise for {}",
498 self.local_id, self.channel_id, node_id
499 );
500 self.voted_for = node_id;
501 self.reset_tick();
502 true
503 } else {
504 false
505 }
506 }
507
508 pub fn receive_promise(&mut self, node_id: u64) {
509 match self.progress_map.get_mut(&node_id) {
510 Some(p) => p.granted = true,
511 None => {}
512 };
513 }
514
515 pub fn stage_majority(&self, stage: ConfigStage) -> usize {
516 majority(
517 self.progress_map
518 .iter()
519 .filter(|(_, p)| p.belongs_to_stage(stage))
520 .count(),
521 )
522 }
523
524 pub fn current_term_safe_commit_id(&self) -> u64 {
525 std::cmp::max(self.committed_id, self.prev_term_last_id)
526 }
527
528 fn receive_stage_majority_promise(&self, stage: ConfigStage) -> bool {
529 let granted_members = self
530 .progress_map
531 .iter()
532 .filter(|(_, p)| p.granted && p.belongs_to_stage(stage))
533 .map(|(k, _)| *k)
534 .collect::<Vec<u64>>();
535
536 return granted_members.len() >= self.stage_majority(stage);
537 }
538
539 pub fn receive_majority_promise(&self, stage: ConfigStage) -> bool {
540 match &stage {
541 ConfigStage::Both => {
542 self.receive_stage_majority_promise(ConfigStage::Old)
543 && self.receive_stage_majority_promise(ConfigStage::New)
544 }
545 _ => self.receive_stage_majority_promise(ConfigStage::New),
546 }
547 }
548
549 fn compute_candidate_id(&self, stage: ConfigStage) -> u64 {
550 let mut match_ids = self
551 .progress_map
552 .iter()
553 .filter(|(_, p)| p.belongs_to_stage(stage))
554 .map(|(_, p)| p.match_id)
555 .collect::<Vec<_>>();
556 match_ids.sort();
557 let total_numbers = match_ids.len();
558 if total_numbers == 0 {
559 self.committed_id
560 } else {
561 match_ids[total_numbers - majority(total_numbers)]
562 }
563 }
564
565 pub fn try_advance_committed_id(&mut self, stage: ConfigStage) -> bool {
566 let candidate_id = match &stage {
567 ConfigStage::Both => std::cmp::min(
568 self.compute_candidate_id(ConfigStage::New),
569 self.compute_candidate_id(ConfigStage::Old),
570 ),
571 _ => self.compute_candidate_id(ConfigStage::New),
572 };
573 if candidate_id > std::cmp::max(self.prev_term_last_id, self.committed_id) {
574 trace!(
575 "node {} channel {} advance committed index from {} to {}, stage: {:?}",
576 self.local_id,
577 self.channel_id,
578 self.committed_id,
579 candidate_id,
580 stage
581 );
582 self.committed_id = candidate_id;
583 true
584 } else {
585 false
586 }
587 }
588
589 pub fn update_committed_id(&mut self, id: u64) {
590 if self.committed_id < id {
591 debug!(
592 "node {} channel {} update committed id from {} to {}",
593 self.local_id, self.channel_id, self.committed_id, id
594 );
595 self.committed_id = id;
596 }
597 }
598
599 pub fn update_progress(&mut self, remote: u64, reject: bool, msg_id: u64, hint_id: u64) {
600 let is_leader = self.is_leader();
601 if let Some(progress) = self.progress_map.get_mut(&remote) {
602 progress.on_receive_msg();
603 if !reject {
604 debug!(
605 "node {} channel {} try advance remote {} match id to {}",
606 self.local_id,
607 self.channel_id,
608 remote,
609 hint_id - 1u64
610 );
611 progress.try_advance_matched_id(hint_id);
612 } else if is_leader {
613 debug!(
614 "node {} channel {} try reset remote {} next id to {}",
615 self.local_id, self.channel_id, remote, hint_id
616 );
617 progress.try_reset_next_id(msg_id, hint_id);
618 } else {
619 warn!(
620 "node {} channel id {} receive a rejected index request {:?} but I isn't a leader any more",
621 self.local_id, self.channel_id, reject
622 );
623 }
624 }
625 }
626
627 pub fn is_leader(&self) -> bool {
628 self.role == Role::Leader
629 }
630
631 pub fn is_candidate(&self) -> bool {
632 self.role == Role::Candidate
633 }
634
635 pub fn is_follower(&self) -> bool {
636 self.role == Role::Follower
637 }
638
639 pub fn is_student(&self) -> bool {
640 self.role == Role::Student
641 }
642
643 pub fn has_leader(&self) -> bool {
644 self.leader_id != INVALID_NODE_ID
645 }
646
647 pub fn is_remote_matched(&self, remote: u64, last_id: u64) -> bool {
648 if let Some(progress) = self.progress_map.get(&remote) {
649 if progress.match_id == last_id {
650 return true;
651 }
652 }
653 false
654 }
655
656 pub fn missed_channel_ids(&self) -> Vec<u64> {
657 let mut missed_channel_ids = Vec::new();
658 for channel_id in &self.missed_voters {
659 if self.learned_voters.contains(&channel_id) {
660 continue;
661 }
662 missed_channel_ids.push(*channel_id);
663 }
664 missed_channel_ids
665 }
666
667 pub fn update_local_match(&mut self, to: u64) {
668 if let Some(progress) = self.progress_map.get_mut(&self.local_id) {
669 debug!(
670 "node {} channel {} update the local matched entry id from {} to {}",
671 self.local_id, self.channel_id, progress.match_id, to
672 );
673 assert_eq!(progress.match_id <= to, true);
674 progress.match_id = to;
675 }
676 }
677
678 pub fn get_local_match_id(&self) -> u64 {
679 self.progress_map
680 .get(&self.local_id)
681 .map(|p| p.match_id)
682 .unwrap_or(INVALID_ID)
683 }
684
685 pub fn get_local_safety_commit_id(&self) -> u64 {
686 std::cmp::min(self.committed_id, self.get_local_match_id())
687 }
688
689 pub fn log_replicated(&mut self, node_id: u64, next_id: u64) {
690 self.progress_map
691 .get_mut(&node_id)
692 .map(|p| p.step_builded(next_id));
693 }
694
695 pub fn hard_state(&self) -> HardState {
696 HardState {
697 voted_for: self.voted_for,
698 current_term: self.term,
699 }
700 }
701
702 pub fn enter_both_config_stage(
703 &mut self,
704 new_configs: &HashSet<u64>,
705 old_configs: &HashSet<u64>,
706 ) {
707 debug!(
708 "node {} channel {} enter both stage: new configs {:?}, old configs {:?}",
709 self.local_id, self.channel_id, new_configs, old_configs
710 );
711 for (id, progress) in &mut self.progress_map {
712 progress.belongs_to = if old_configs.contains(id) {
713 ConfigStage::Old
714 } else {
715 ConfigStage::Both
716 };
717 }
718
719 for id in new_configs {
720 if let Some(p) = self.progress_map.get_mut(id) {
721 p.belongs_to = ConfigStage::New;
722 continue;
723 }
724 self.progress_map
725 .insert(*id, Progress::new(*id, ConfigStage::New));
726 }
727 }
728
729 pub fn enter_new_config_stage(
730 &mut self,
731 new_configs: &HashSet<u64>,
732 old_configs: &HashSet<u64>,
733 ) {
734 debug!(
735 "node {} channel {} enter new stage: new configs {:?}, old configs {:?}",
736 self.local_id, self.channel_id, new_configs, old_configs
737 );
738 for id in old_configs {
739 self.progress_map.remove(id);
740 }
741 for (_id, progress) in &mut self.progress_map {
742 progress.belongs_to = ConfigStage::New;
743 }
744 }
745
746 pub fn rollback_config_change(
747 &mut self,
748 new_configs: &HashSet<u64>,
749 old_configs: &HashSet<u64>,
750 ) {
751 self.enter_new_config_stage(old_configs, new_configs);
752 }
753
754 pub fn get_member_states(&self) -> HashMap<u64, MemberState> {
755 self.progress_map
756 .iter()
757 .filter(|(id, _)| **id != INDEX_CHANNEL_ID)
758 .map(|(id, _)| (*id, MemberState::default()))
759 .collect::<HashMap<_, _>>()
760 }
761
762 pub fn matched_committed_id(&self, id: u64) -> u64 {
763 let match_id = self.progress_map.get(&id).unwrap().match_id;
764 std::cmp::min(match_id, self.committed_id)
765 }
766}
767
768#[cfg(test)]
769mod tests {
770 use super::*;
771
772 #[test]
773 fn reset_to_probe() {
774 let mut p = Progress::new(1, ConfigStage::New);
775 let last_id = 10;
776 p.reset(last_id);
777 assert_eq!(p.next_id, last_id + 1u64);
778 assert_eq!(p.match_id, INVALID_ID);
779 assert_eq!(p.state, ProgressState::Probe);
780
781 let mut hint_id = 5;
782 p.try_advance_matched_id(hint_id);
783 assert_eq!(p.match_id, hint_id - 1u64);
784 assert_eq!(p.next_id, last_id + 1u64);
785 assert_eq!(p.state, ProgressState::Replicate);
786
787 p.reset(123);
788 assert_eq!(p.state, ProgressState::Probe);
789 }
790
791 #[test]
792 fn update_match_next_id() {
793 let mut p = Progress::new(1, ConfigStage::New);
794 let last_id = 10;
795 p.reset(last_id);
796 assert_eq!(p.next_id, last_id + 1u64);
797 assert_eq!(p.match_id, INVALID_ID);
798 assert_eq!(p.state, ProgressState::Probe);
799
800 let mut hint_id = 5;
801 p.try_advance_matched_id(hint_id);
802 assert_eq!(p.match_id, hint_id - 1u64);
803 assert_eq!(p.next_id, last_id + 1u64);
804 assert_eq!(p.state, ProgressState::Replicate);
805
806 hint_id = 3;
808 p.try_advance_matched_id(hint_id);
809 assert_eq!(p.match_id, 4);
810 assert_eq!(p.next_id, last_id + 1u64);
811
812 p.try_advance_matched_id(last_id + 1u64);
814 assert_eq!(p.match_id, last_id);
815 assert_eq!(p.next_id, last_id + 1u64);
816
817 p.next_id = 100;
819 p.try_reset_next_id(hint_id, hint_id);
820 assert_eq!(p.match_id, last_id);
821 assert_eq!(p.next_id, 100);
822
823 p.state = ProgressState::Replicate;
824 p.try_reset_next_id(p.next_id, 50);
825 assert_eq!(p.match_id, last_id);
826 assert_eq!(p.next_id, p.match_id + 1u64);
827
828 assert_eq!(p.state, ProgressState::Probe);
829 p.try_reset_next_id(p.next_id, 50);
830 assert_eq!(p.match_id, last_id);
831 assert_eq!(p.next_id, 50);
832 }
833}