1use std::collections::{HashMap, HashSet, VecDeque};
18
19use chrono::prelude::*;
20use log::{debug, error, info, trace, warn};
21use rand::{thread_rng, RngCore};
22
23use crate::constant::*;
24use crate::error::Error;
25use crate::progress::*;
26use crate::storage::*;
27use crate::types::*;
28
29#[derive(Debug)]
31pub struct SdconsOption {
32 pub base_election_timeout_tick: u32,
41
42 pub max_stable_bytes: u64,
47
48 pub max_apply_bytes: u64,
53
54 pub max_bcast_bytes: u64,
59}
60
61impl Default for SdconsOption {
62 fn default() -> SdconsOption {
63 SdconsOption {
64 base_election_timeout_tick: 3,
65 max_stable_bytes: 64 * 1024 * 1024,
66 max_apply_bytes: 64 * 1024 * 1024,
67 max_bcast_bytes: 128 * 1204 * 1024,
68 }
69 }
70}
71
72#[inline(always)]
74fn generate_election_timeout_tick(base: u32) -> u32 {
75 base + thread_rng().next_u32() % base + 1
76}
77
78#[inline(always)]
81fn is_leader_claim_message(msg: &Message, target_channel_id: u64) -> bool {
82 match &msg.detail {
83 MsgDetail::Index(_) | MsgDetail::Declare(_) | MsgDetail::Append(_)
84 if target_channel_id == msg.channel_id =>
85 {
86 true
87 }
88 _ => false,
89 }
90}
91
92#[inline(always)]
95fn unwrap_config_stage(pending_configs: &Option<PendingConfigs>) -> ConfigStage {
96 pending_configs
97 .as_ref()
98 .map(|p| p.stage)
99 .unwrap_or(ConfigStage::New)
100}
101
102#[inline(always)]
104fn is_same_pending_configs(lhs: &Option<PendingConfigs>, rhs: &Option<PendingConfigs>) -> bool {
105 match (lhs, rhs) {
106 (Some(l), Some(r)) => l.eq(r),
107 _ => false,
108 }
109}
110
111#[inline(always)]
112fn active_members(descs: &HashMap<u64, MemberState>) -> Vec<u64> {
113 descs.iter().map(|(id, _d)| *id).collect::<Vec<_>>()
115}
116
117#[inline(always)]
118fn filter_index_channel(channel_ids: &Vec<u64>) -> HashSet<u64> {
119 channel_ids
120 .iter()
121 .cloned()
122 .filter(|id| *id != INDEX_CHANNEL_ID)
123 .collect::<HashSet<_>>()
124}
125
126pub enum Control {
128 ReleaseMemory,
131 TransferLeader(u64),
135 TimeoutNow,
138 Checkpoint,
140}
141
142#[derive(Debug)]
154pub struct MsgBuilder {
155 from: u64,
156 to: u64,
157 index_term: u64,
158 channel_id: u64,
159 channel_term: u64,
160 committed_id: u64,
161 first_id: u64,
162 until_id: u64,
163}
164
165impl MsgBuilder {
166 pub fn local_id(&self) -> u64 {
168 self.from
169 }
170
171 pub fn remote_id(&self) -> u64 {
173 self.to
174 }
175
176 pub fn channel_id(&self) -> u64 {
178 self.channel_id
179 }
180
181 pub fn from_id(&self) -> u64 {
183 self.first_id
184 }
185
186 pub fn until_id(&self) -> u64 {
188 self.until_id
189 }
190
191 pub fn build_append_msg(builder: &Self, prev_term: u64, entries: Vec<Entry>) -> Message {
192 assert_ne!(builder.channel_id, INDEX_CHANNEL_ID);
193 Message {
194 from: builder.from,
195 to: builder.to,
196 index_term: builder.index_term,
197 channel_id: builder.channel_id,
198 channel_term: builder.channel_term,
199 detail: MsgDetail::Append(AppendMsg {
200 committed_entry_id: builder.committed_id,
201 prev_entry_id: builder.first_id - 1u64,
202 prev_entry_term: prev_term,
203 entries,
204 }),
205 }
206 }
207
208 pub fn build_index_msg(builder: &Self, prev_term: u64, indexes: Vec<LogIndex>) -> Message {
209 assert_eq!(builder.channel_id, INDEX_CHANNEL_ID);
210 Message {
211 from: builder.from,
212 to: builder.to,
213 index_term: builder.index_term,
214 channel_id: builder.channel_id,
215 channel_term: builder.channel_term,
216 detail: MsgDetail::Index(IndexMsg {
217 committed_index_id: builder.committed_id,
218 prev_index_id: builder.first_id - 1u64,
219 prev_index_term: prev_term,
220 indexes,
221 }),
222 }
223 }
224}
225
226#[derive(Debug)]
227struct RemoteSnapshotRecevingStates {
228 id: u64,
229 states: HashMap<u64, SnapshotRecevingState>,
230}
231
232impl RemoteSnapshotRecevingStates {
233 fn new(id: u64) -> RemoteSnapshotRecevingStates {
234 RemoteSnapshotRecevingStates {
235 id,
236 states: HashMap::new(),
237 }
238 }
239
240 fn is_any_remote_receiving_snapshot(&self) -> bool {
241 self.states.iter().map(|(_, v)| v.receiving).count() > 0
242 }
243
244 fn is_remote_receiving_snapshot(&self, remote_id: u64) -> bool {
245 self.states
246 .get(&remote_id)
247 .map(|s| s.receiving)
248 .unwrap_or(false)
249 }
250
251 fn get_mut_remote_snapshot_state(&mut self, remote_id: u64) -> &mut SnapshotRecevingState {
252 self.states
253 .entry(remote_id)
254 .or_insert(SnapshotRecevingState {
255 receiving: false,
256 start_at: 0,
257 last_heartbeat_at: 0,
258 })
259 }
260
261 fn update_remote_snapshot_state(&mut self, remote_id: u64, receiving: bool) {
262 let e = self.get_mut_remote_snapshot_state(remote_id);
263 e.receiving = receiving;
264 e.last_heartbeat_at = Local::now().timestamp_nanos();
265 e.start_at = Local::now().timestamp_nanos();
266 }
267
268 fn maybe_update_remote_snapshot_state(&mut self, remote_id: u64, is_receving: bool) {
269 let id = self.id;
270 let e = self.get_mut_remote_snapshot_state(remote_id);
271 if e.receiving {
272 e.last_heartbeat_at = Local::now().timestamp_nanos();
273 if !is_receving && e.last_heartbeat_at - e.start_at >= 1000 * 1000 * 1000 {
274 info!("node {} remote {} receiving snapshot timeout", id, remote_id);
276 e.receiving = false;
277 }
278 } else if is_receving {
279 e.receiving = true;
280 e.last_heartbeat_at = Local::now().timestamp_nanos();
281 e.start_at = Local::now().timestamp_nanos();
282 }
283 }
284}
285
286#[derive(Debug)]
288struct TransferingRecord {
289 pub to: u64,
291
292 pub channel_id: u64,
294
295 pub elapsed_tick: usize,
297}
298
299impl Default for TransferingRecord {
300 fn default() -> TransferingRecord {
301 TransferingRecord {
302 to: INVALID_NODE_ID,
303 channel_id: INDEX_CHANNEL_ID,
304 elapsed_tick: 0,
305 }
306 }
307}
308
309impl TransferingRecord {
310 fn new(channel_id: u64, to: u64) -> TransferingRecord {
311 TransferingRecord {
312 to,
313 channel_id,
314 elapsed_tick: 0,
315 }
316 }
317
318 fn doing(&self) -> bool {
320 self.to != INVALID_NODE_ID
321 }
322
323 fn elapsed(&mut self) {
325 if self.to != INVALID_NODE_ID {
326 self.elapsed_tick += 1;
327 }
328 }
329
330 fn timeout(&self) -> bool {
332 self.to != INVALID_NODE_ID && self.elapsed_tick >= 3
334 }
335
336 fn abort(&mut self) {
337 self.reset(INVALID_NODE_ID, INDEX_CHANNEL_ID);
338 }
339
340 fn reset(&mut self, to: u64, channel_id: u64) {
341 self.to = to;
342 self.channel_id = channel_id;
343 self.elapsed_tick = 0;
344 }
345
346 fn initiate(&mut self, channel_id: u64, to: u64) -> bool {
349 if self.to != INVALID_NODE_ID {
350 self.channel_id == channel_id && self.to == to
351 } else {
352 self.reset(to, channel_id);
353 true
354 }
355 }
356
357 fn target_to(&self, channel_id: u64, to: u64) -> bool {
358 self.to == to && self.channel_id == channel_id
359 }
360}
361
362#[derive(Debug)]
364struct PendingConfigs {
365 stage: ConfigStage,
366
367 channel_id: u64,
368 term: u64,
369
370 index_id: u64,
373 entry_id: u64,
374
375 configs: HashSet<u64>,
377
378 new_configs: HashSet<u64>,
380 old_configs: HashSet<u64>,
382}
383
384impl PartialEq for PendingConfigs {
385 fn eq(&self, other: &PendingConfigs) -> bool {
386 if self.index_id == INVALID_ID || other.index_id == INVALID_ID {
389 return false;
390 }
391 self.channel_id == other.channel_id
392 && self.index_id == other.index_id
393 && self.term == other.term
394 }
395}
396
397impl PendingConfigs {
398 fn new(
399 channel_id: u64,
400 index_id: u64,
401 entry_id: u64,
402 term: u64,
403 stage: ConfigStage,
404 configs: HashSet<u64>,
405 old_members: &HashSet<u64>,
406 ) -> PendingConfigs {
407 let new_configs = configs
408 .iter()
409 .filter(|v| !old_members.contains(v))
410 .cloned()
411 .collect::<HashSet<_>>();
412 let old_configs = old_members
413 .iter()
414 .filter(|id| !configs.contains(*id) && **id != INDEX_CHANNEL_ID)
415 .cloned()
416 .collect::<HashSet<_>>();
417 PendingConfigs {
418 stage,
419 channel_id,
420 term,
421 index_id,
422 entry_id,
423 configs,
424 new_configs,
425 old_configs,
426 }
427 }
428
429 fn desc(&self) -> ChangeConfig {
430 ChangeConfig {
431 index_id: self.index_id,
432 entry_id: self.entry_id,
433 term: self.term,
434 stage: self.stage,
435 members: self.configs.clone(),
436 }
437 }
438
439 fn member_stage(&self, id: u64) -> MemberState {
440 MemberState {
441 applied: false,
442 stage: if self.old_configs.contains(&id) {
443 ConfigStage::Old
444 } else if self.new_configs.contains(&id) {
445 ConfigStage::New
446 } else {
447 ConfigStage::Both
448 },
449 }
450 }
451
452 fn get_member_stages(&self) -> HashMap<u64, MemberState> {
453 self.configs
454 .iter()
455 .map(|id| (*id, self.member_stage(*id)))
456 .collect()
457 }
458}
459
460#[derive(Debug)]
461pub(crate) struct Sdcons {
462 option: SdconsOption,
463 random_election_timeout_tick: u32,
464
465 id: u64,
466 log_buffer: MemStorage,
467 channel_ids: Vec<u64>,
468 channels: HashMap<u64, ChannelInfo>,
469 pending_configs: Option<PendingConfigs>,
470
471 next_read_request_id: u64,
472 pending_reads: VecDeque<u64>,
473 undoing_reads: Vec<u64>,
474
475 choosen_id: u64,
476 pending_id: u64,
477 applied_id: u64,
478
479 transfer: TransferingRecord,
480 snapshot_state: SnapshotState,
481 snapshot_receving_states: RemoteSnapshotRecevingStates,
482
483 ready: Ready,
484}
485
486#[derive(Debug)]
490pub(crate) struct Ready {
491 pub first_apply_index_id: u64,
494 pub last_apply_index_id: u64,
495 pub chosen_entries: Vec<Entry>,
496
497 pub unstable_indexes: Vec<LogIndex>,
498 pub unstable_entries: Vec<Entry>,
499
500 pub msgs: HashMap<u64, Vec<Message>>,
501 pub cop_msgs: Option<Vec<MsgBuilder>>,
502
503 pub post_apply: bool,
511 pub should_checkpoint: bool,
512 pub should_stable_metas: bool,
513 pub pending_snapshot: Option<SnapshotDesc>,
514
515 pub roles: HashMap<u64, Role>,
516 pub hard_states: HashMap<u64, HardState>,
517 pub member_states: HashMap<u64, MemberState>,
518 pub committed_stats: HashMap<u64, u64>,
519
520 pub finished_reads: HashMap<u64, u64>, }
522
523impl Default for Ready {
524 fn default() -> Self {
525 Ready {
526 first_apply_index_id: INVALID_ID,
527 last_apply_index_id: INVALID_ID,
528 should_checkpoint: false,
529 should_stable_metas: false,
530 post_apply: false,
531 msgs: HashMap::new(),
532 unstable_indexes: Vec::new(),
533 unstable_entries: Vec::new(),
534 chosen_entries: Vec::new(),
535 cop_msgs: None,
536 pending_snapshot: None,
537 roles: HashMap::new(),
538 hard_states: HashMap::new(),
539 member_states: HashMap::new(),
540 committed_stats: HashMap::new(),
541 finished_reads: HashMap::new(),
542 }
543 }
544}
545
546impl Ready {
547 fn send_msg(&mut self, msg: Message) {
548 assert_ne!(msg.to, INVALID_NODE_ID);
549 assert_ne!(msg.to, INDEX_CHANNEL_ID);
550 self.msgs
551 .entry(msg.channel_id)
552 .or_insert(Vec::new())
553 .push(msg);
554 }
555
556 fn send_cop_msg(&mut self, msg_builder: MsgBuilder) {
557 assert_ne!(msg_builder.to, INDEX_CHANNEL_ID);
558 assert_ne!(msg_builder.to, INVALID_NODE_ID);
559 if let Some(v) = &mut self.cop_msgs {
560 v.push(msg_builder);
561 } else {
562 self.cop_msgs = Some(vec![msg_builder])
563 }
564 }
565}
566
567impl Sdcons {
568 fn assign_index(&mut self, channel_id: u64, entry_id: u64) -> u64 {
569 assert_ne!(channel_id, INDEX_CHANNEL_ID);
570 let index = LogIndex {
571 channel_id,
572 entry_id,
573 index_id: 0,
574 term: self.get_channel_term(INDEX_CHANNEL_ID),
575 context: None,
576 };
577 let index_id = self.log_buffer.allocate(index);
578 debug!(
579 "node {} channel {} assign channel {} entry {} index {}",
580 self.id, INDEX_CHANNEL_ID, channel_id, entry_id, index_id
581 );
582 index_id
583 }
584
585 fn is_index_leader(&self) -> bool {
586 self.is_channel_leader(INDEX_CHANNEL_ID)
587 }
588
589 fn is_index_student(&self) -> bool {
590 self.channels
591 .get(&INDEX_CHANNEL_ID)
592 .map(|i| i.is_student())
593 .unwrap_or(false)
594 }
595
596 fn is_channel_leader(&self, channel_id: u64) -> bool {
597 self.channels
598 .get(&channel_id)
599 .map(|i| i.is_leader())
600 .unwrap_or(false)
601 }
602
603 fn is_channel_candidate(&self, channel_id: u64) -> bool {
604 self.channels
605 .get(&channel_id)
606 .map(|i| i.is_candidate())
607 .unwrap_or(false)
608 }
609
610 fn is_channel_follower(&self, channel_id: u64) -> bool {
611 self.channels
612 .get(&channel_id)
613 .map(|i| i.is_follower())
614 .unwrap_or(false)
615 }
616
617 fn is_local_channel_leader(&self) -> bool {
618 self.is_channel_leader(self.id)
619 }
620
621 fn local_entry_channel_term(&self) -> u64 {
622 self.get_channel_term(self.id)
623 }
624
625 fn next_entry_id(&self) -> u64 {
626 self.channel_next_entry_id(self.id)
627 }
628
629 fn channel_next_entry_id(&self, channel_id: u64) -> u64 {
630 self.log_buffer.channel_last_entry_id(channel_id) + 1u64
631 }
632
633 fn remote_ids(&self) -> Vec<u64> {
634 self.channel_ids
635 .iter()
636 .map(|k| *k)
637 .filter(|k| *k != self.id && *k != INDEX_CHANNEL_ID)
638 .collect::<Vec<_>>()
639 }
640
641 fn get_member_states(&self) -> HashMap<u64, MemberState> {
642 if let Some(pending_configs) = &self.pending_configs {
643 match pending_configs.stage {
644 ConfigStage::New | ConfigStage::Both => {
645 return pending_configs.get_member_stages();
646 }
647 _ => {}
648 }
649 }
650 self.get_index_channel().get_member_states()
651 }
652
653 fn build_channel_if_not_exists(&mut self, channel_id: u64) {
654 if !self.channels.contains_key(&channel_id) {
655 self.channels.insert(
656 channel_id,
657 ChannelInfo::build(self.id, channel_id, self.get_member_states()),
658 );
659 }
660 }
661
662 fn build_msg_header(&self, channel_id: u64) -> Message {
663 let index_term = self.get_index_term();
664 let channel_term = if channel_id == INDEX_CHANNEL_ID {
665 index_term
666 } else {
667 self.get_channel_term(channel_id)
668 };
669 Message {
670 from: self.id,
671 to: INVALID_NODE_ID,
672 index_term,
673 channel_id,
674 channel_term,
675 detail: MsgDetail::None,
676 }
677 }
678
679 fn append_entry(&mut self, channel_id: u64, entry: Entry) {
680 self.log_buffer.append_entry(channel_id, entry)
681 }
682
683 fn advance_choose_progress(&mut self) {
684 let committed_map = self
686 .channels
687 .iter()
688 .map(|(id, c)| (*id, c.get_local_safety_commit_id()))
689 .collect::<HashMap<_, _>>();
690 self.log_buffer.advance_chosen_index_id(committed_map);
691 }
692
693 fn bcast_index_channel<T>(&mut self, log_meta_view: &T)
694 where
695 T: LogMetaView,
696 {
697 let msg_header = self.build_msg_header(INDEX_CHANNEL_ID);
698 let mem_first_id = self.log_buffer.channel_first_entry_id(INDEX_CHANNEL_ID);
699 let mem_last_id = self.log_buffer.channel_last_entry_id(INDEX_CHANNEL_ID);
700 let channel_info = self.channels.get_mut(&INDEX_CHANNEL_ID).unwrap();
701 if !channel_info.is_leader() {
702 return;
703 }
704
705 let config_stage = unwrap_config_stage(&self.pending_configs);
706 let advanced = channel_info.try_advance_committed_id(config_stage);
707 let committed_index_id = channel_info.committed_id;
708 for (id, progress) in &mut channel_info.progress_map {
709 if *id == self.id
710 || progress.is_paused()
711 || self
712 .snapshot_receving_states
713 .is_remote_receiving_snapshot(*id)
714 {
715 continue;
716 }
717
718 if progress.next_id <= mem_first_id {
719 let (stabled_first_id, _) = log_meta_view.range_of(INDEX_CHANNEL_ID);
720 if progress.next_id < stabled_first_id {
721 if let SnapshotState::Creating = self.snapshot_state {
722 continue;
723 } else if let Some(desc) = log_meta_view.latest_snapshot() {
724 debug!(
725 "node {} channel {} replicate snapshot to {}",
726 self.id, INDEX_CHANNEL_ID, *id
727 );
728 let mut msg = msg_header.clone();
729 msg.to = *id;
730 msg.detail = MsgDetail::Snapshot(desc);
731 self.ready.send_msg(msg);
732 self.snapshot_receving_states
733 .update_remote_snapshot_state(*id, true);
734 } else {
735 debug!("node {} channel {} try create snapshot", self.id, INDEX_CHANNEL_ID);
736 self.snapshot_state = SnapshotState::Creating;
737 self.ready.should_checkpoint = true;
738 }
739 } else {
740 let builder = MsgBuilder {
741 from: self.id,
742 to: *id,
743 index_term: msg_header.index_term,
744 channel_id: msg_header.channel_id,
745 channel_term: msg_header.channel_term,
746 committed_id: committed_index_id,
747 first_id: progress.next_id,
748 until_id: mem_first_id,
749 };
750 debug!(
751 "node {} channel {} send build msg: {:?}",
752 self.id, INDEX_CHANNEL_ID, builder
753 );
754 self.ready.send_cop_msg(builder);
755 progress.step_building_msg();
756 }
757 } else if progress.next_id <= mem_last_id || advanced {
758 let prev_index_id = progress.next_id - 1u64;
759 debug!(
760 "node {} channel {} replicate indexes[{}, {}) to {}, advance {}",
761 self.id,
762 INDEX_CHANNEL_ID,
763 progress.next_id,
764 mem_last_id + 1u64,
765 *id,
766 advanced
767 );
768 let prev_index_term = self
769 .log_buffer
770 .index_term(prev_index_id)
771 .unwrap_or(INITIAL_TERM);
772 let indexes = self.log_buffer.index_range(progress.next_id, mem_last_id);
773 progress.replicate_to(mem_last_id);
774 let mut msg = msg_header.clone();
775 msg.to = *id;
776 msg.detail = MsgDetail::Index(IndexMsg {
777 committed_index_id,
778 prev_index_id,
779 prev_index_term,
780 indexes,
781 });
782 self.ready.send_msg(msg);
783 }
784 }
785 }
786
787 fn bcast_entry_channel<T>(&mut self, channel_id: u64, log_meta_view: &T)
788 where
789 T: LogMetaView,
790 {
791 let msg_header = self.build_msg_header(channel_id);
792 let first_id = self.log_buffer.channel_first_entry_id(channel_id);
793 let last_id = self.log_buffer.channel_last_entry_id(channel_id);
794 let channel_info = self.channels.get_mut(&channel_id).unwrap();
795 if !channel_info.is_leader() {
796 return;
797 }
798 let config_stage = unwrap_config_stage(&self.pending_configs);
799 let advanced = channel_info.try_advance_committed_id(config_stage);
800 let committed_entry_id = channel_info.committed_id;
801 for (id, progress) in &mut channel_info.progress_map {
802 if *id == self.id
803 || progress.is_paused()
804 || self
805 .snapshot_receving_states
806 .is_remote_receiving_snapshot(*id)
807 {
808 continue;
809 }
810
811 if progress.next_id <= first_id {
812 let (stabled_first_id, _) = log_meta_view.range_of(INDEX_CHANNEL_ID);
813 if progress.next_id < stabled_first_id {
814 if let SnapshotState::Creating = self.snapshot_state {
815 continue;
816 } else if let Some(desc) = log_meta_view.latest_snapshot() {
817 debug!(
818 "node {} channel {} replicate snapshot to {}",
819 self.id, channel_id, *id
820 );
821 let mut msg = msg_header.clone();
822 msg.to = *id;
823 msg.detail = MsgDetail::Snapshot(desc);
824 self.ready.send_msg(msg);
825 self.snapshot_receving_states
826 .update_remote_snapshot_state(*id, true);
827 } else {
828 debug!("node {} channel {} try create snapshot", self.id, channel_id);
829 self.snapshot_state = SnapshotState::Creating;
830 self.ready.should_checkpoint = true;
831 }
832 } else {
833 let builder = MsgBuilder {
834 from: self.id,
835 to: *id,
836 index_term: msg_header.index_term,
837 channel_id: msg_header.channel_id,
838 channel_term: msg_header.channel_term,
839 committed_id: committed_entry_id,
840 first_id: progress.next_id,
841 until_id: first_id,
842 };
843 debug!("node {} channel {} send build msg: {:?}", self.id, channel_id, builder);
844 self.ready.send_cop_msg(builder);
845 progress.step_building_msg();
846 }
847 } else if progress.next_id <= last_id || advanced {
848 let prev_entry_id = progress.next_id - 1u64;
849 debug!(
850 "node {} channel {} replicate entries[{}, {}) to {}, advance {}",
851 self.id,
852 channel_id,
853 progress.next_id,
854 last_id + 1u64,
855 *id,
856 advanced
857 );
858 let prev_entry_term = self
859 .log_buffer
860 .entry_term(channel_id, prev_entry_id)
861 .unwrap_or(INITIAL_TERM);
862 let entries = self
863 .log_buffer
864 .range_of(channel_id, progress.next_id, last_id);
865 debug!("node {} channel {} entries is {:?}", self.id, channel_id, entries);
866 progress.replicate_to(last_id);
867 let mut msg = msg_header.clone();
868 msg.to = *id;
869 msg.detail = MsgDetail::Append(AppendMsg {
870 committed_entry_id,
871 prev_entry_id,
872 prev_entry_term,
873 entries,
874 });
875 self.ready.send_msg(msg);
876 }
877 }
878 }
879
880 fn extract_chosen_entries(&mut self) {
881 let (first_index_id, last_index_id, entries) = self
882 .log_buffer
883 .extract_unapply_entries(self.option.max_apply_bytes);
884 self.ready.first_apply_index_id = first_index_id;
885 self.ready.last_apply_index_id = last_index_id;
886 self.ready.chosen_entries = entries;
887 }
888
889 fn extract_unstable_entries(&mut self) {
890 self.ready.unstable_entries = self
891 .log_buffer
892 .extract_unstable_entries(self.option.max_stable_bytes);
893 self.ready.unstable_indexes = self.log_buffer.extract_unstable_indexes();
894 }
895
896 fn bcast_each_channels<L>(&mut self, log_meta_view: &L)
897 where
898 L: LogMetaView,
899 {
900 for idx in 0..self.channel_ids.len() {
904 let channel_id = self.channel_ids[idx];
905 if channel_id == INDEX_CHANNEL_ID {
906 self.bcast_index_channel(log_meta_view);
907 } else {
908 self.bcast_entry_channel(channel_id, log_meta_view);
909 }
910 }
911 }
912
913 fn collect_state_snapshot(&mut self) {
914 self.ready.hard_states = self
915 .channels
916 .iter()
917 .map(|(i, c)| (*i, c.hard_state()))
918 .collect::<HashMap<_, _>>();
919 self.ready.roles = self
920 .channels
921 .iter()
922 .map(|(i, c)| (*i, c.role))
923 .collect::<HashMap<_, _>>();
924 self.ready.committed_stats = self
925 .channels
926 .iter()
927 .map(|(i, c)| (*i, c.get_local_safety_commit_id()))
928 .collect::<HashMap<_, _>>();
929 self.ready.member_states = self.get_member_states();
930 }
931
932 fn force_remote_step_down(
933 &mut self,
934 to: u64,
935 old_term: u64,
936 channel_id: u64,
937 detail: &MsgDetail,
938 ) {
939 debug!("node {} channel {} force remote {} step down because: receive staled {} msg, old term: {}",
940 self.id, channel_id, to, detail, old_term);
941 let mut msg_header = self.build_msg_header(channel_id);
942 msg_header.to = to;
943 self.ready.send_msg(msg_header);
944 }
945
946 fn reject_staled_message(&mut self, msg: &Message) -> bool {
947 if msg.index_term < self.get_channel_term(INDEX_CHANNEL_ID) {
948 self.force_remote_step_down(msg.from, msg.index_term, INDEX_CHANNEL_ID, &msg.detail);
949 true
950 } else if msg.channel_id != INDEX_CHANNEL_ID
951 && msg.channel_term < self.get_channel_term(msg.channel_id)
952 {
953 self.force_remote_step_down(msg.from, msg.channel_term, msg.channel_id, &msg.detail);
954 true
955 } else {
956 false
957 }
958 }
959
960 fn maybe_transfering_finised(&mut self, channel_id: u64, from: u64) {
961 if self.transfer.target_to(channel_id, from) {
962 info!(
963 "node {} channel {} transfer leadership to {} success",
964 self.id, channel_id, from
965 );
966 self.transfer = TransferingRecord::default();
967 }
968 }
969
970 fn try_advance_channel_term(&mut self, msg: &Message) {
971 let mut active_channels = vec![(INDEX_CHANNEL_ID, msg.index_term)];
972 if msg.channel_id != INDEX_CHANNEL_ID {
973 active_channels.push((msg.channel_id, msg.channel_term));
974 }
975
976 let mut advanced = false;
977 for (channel_id, term) in active_channels {
978 let target_leader_id = if is_leader_claim_message(msg, channel_id) {
979 msg.from
980 } else {
981 INVALID_NODE_ID
982 };
983 self.build_channel_if_not_exists(channel_id);
984 let last_id = self.log_buffer.channel_last_entry_id(channel_id);
985 let channel_info = self.channels.get_mut(&channel_id).unwrap();
986 if channel_info.term < term {
987 channel_info.to_follower(target_leader_id, term, last_id, "high-term");
988 self.maybe_transfering_finised(channel_id, msg.from);
989 advanced = true;
990 } else if (channel_info.is_candidate() || channel_info.has_leader())
991 && target_leader_id != INVALID_NODE_ID
992 {
993 channel_info.to_follower(target_leader_id, term, last_id, "claim-leader");
994 advanced = true;
995 }
996 }
997
998 if advanced {
999 self.submit_read_task();
1001 }
1002 }
1003
1004 fn get_index_channel(&self) -> &ChannelInfo {
1005 self.channels.get(&INDEX_CHANNEL_ID).unwrap()
1006 }
1007
1008 fn get_index_term(&self) -> u64 {
1009 self.channels.get(&INDEX_CHANNEL_ID).unwrap().term
1010 }
1011
1012 fn get_channel_term(&self, channel_id: u64) -> u64 {
1013 self.channels
1014 .get(&channel_id)
1015 .map(|c| c.term)
1016 .unwrap_or(INITIAL_TERM)
1017 }
1018
1019 fn maybe_assign_indexes(&mut self, channel_id: u64, first_entry_id: u64, last_entry_id: u64) {
1020 if self.is_index_leader() {
1022 let first_entry_id = std::cmp::max(
1023 self.log_buffer.channel_last_assigned_entry_id(channel_id) + 1u64,
1024 first_entry_id,
1025 );
1026 for idx in first_entry_id..(last_entry_id + 1) {
1027 self.assign_index(channel_id, idx);
1028 }
1029 }
1030 }
1031
1032 fn abort_pending_configs(&mut self, reason: &str) {
1033 assert_eq!(self.pending_configs.is_some(), true);
1034 self.ready.should_stable_metas = true;
1035 let pending_configs = self.pending_configs.as_ref().unwrap();
1036 info!(
1037 "node {} channel {} because {}, abort and rollback {:?}",
1038 self.id,
1039 pending_configs.channel_id,
1040 reason,
1041 pending_configs.desc()
1042 );
1043 for (_channe_id, channel_info) in &mut self.channels {
1044 channel_info
1045 .rollback_config_change(&pending_configs.new_configs, &pending_configs.old_configs);
1046 }
1047 }
1048
1049 fn maybe_update_config_change_stage(
1050 &mut self,
1051 channel_id: u64,
1052 first_entry_id: u64,
1053 _last_entry_id: u64,
1054 ) {
1055 assert_eq!(self.is_channel_follower(channel_id), true);
1056
1057 if let Some(p) = &self.pending_configs {
1060 if p.channel_id == channel_id && first_entry_id <= p.entry_id {
1061 self.abort_pending_configs(&"log entries was truncated");
1062 }
1063 }
1064
1065 let entry = match self.log_buffer.channel_last_config_change_entry(channel_id) {
1066 Some(e) if e.entry_id >= first_entry_id => e,
1067 _ => return,
1068 };
1069
1070 let config_change = entry.configs.unwrap();
1071 info!("node {} channel {} recieve {:?}", self.id, channel_id, config_change);
1072
1073 let old_members = self.channel_ids.iter().cloned().collect::<HashSet<_>>();
1074 self.pending_configs = Some(PendingConfigs::new(
1075 entry.channel_id,
1076 config_change.index_id,
1077 config_change.entry_id,
1078 config_change.term,
1079 config_change.stage,
1080 config_change.members,
1081 &old_members,
1082 ));
1083
1084 self.update_channels_config_stage();
1085 }
1086
1087 fn handle_append(&mut self, from: u64, channel_id: u64, msg: AppendMsg) {
1088 debug!(
1089 "node {} channel {} receive append from {}: {:?}",
1090 self.id, channel_id, from, msg
1091 );
1092
1093 assert_ne!(channel_id, INDEX_CHANNEL_ID);
1094 let mut reply_msg = AppendReplyMsg {
1095 reject: false,
1096 entry_id: msg.prev_entry_id + 1u64,
1097 hint_id: 0,
1098 };
1099 let length = msg.entries.len();
1100 let committed_entry_id = self.channels.get(&channel_id).unwrap().committed_id;
1101 if msg.prev_entry_id < committed_entry_id {
1102 reply_msg.reject = false;
1103 reply_msg.hint_id = committed_entry_id + 1u64;
1104 } else if self.log_buffer.is_term_miss_matched(
1105 channel_id,
1106 msg.prev_entry_id,
1107 msg.prev_entry_term,
1108 ) {
1109 let reject_entry_id =
1110 self.log_buffer
1111 .find_conflict(channel_id, msg.prev_entry_id, msg.prev_entry_term);
1112 reply_msg.reject = true;
1113 reply_msg.hint_id = reject_entry_id;
1114 self.channels.get_mut(&channel_id).unwrap().reset_tick();
1115 } else {
1116 let (first_entry_id, last_entry_id) =
1117 self.log_buffer.append_entries(channel_id, msg.entries);
1118 reply_msg.reject = false;
1119 reply_msg.hint_id = last_entry_id + 1;
1120 self.maybe_update_config_change_stage(channel_id, first_entry_id, last_entry_id);
1121 self.maybe_assign_indexes(channel_id, first_entry_id, last_entry_id);
1122 let channel_info = self.channels.get_mut(&channel_id).unwrap();
1123 channel_info.update_committed_id(std::cmp::min(last_entry_id, msg.committed_entry_id));
1124 channel_info.reset_tick();
1125 }
1126
1127 debug!(
1128 "node {} channel {} receive append from {}, prev id {}, prev term {}, length {}, reply: {:?}",
1129 self.id, channel_id, from,
1130 msg.prev_entry_id, msg.prev_entry_term, length,
1131 reply_msg
1132 );
1133 let mut msg_header = self.build_msg_header(channel_id);
1134 msg_header.to = from;
1135 msg_header.detail = MsgDetail::AppendReply(reply_msg);
1136 self.ready.send_msg(msg_header);
1137 }
1138
1139 fn handle_append_reply(&mut self, from: u64, channel_id: u64, msg: AppendReplyMsg) {
1140 debug!("node {} channel {} receive from {}: {:?}", self.id, channel_id, from, msg);
1141 match self.channels.get_mut(&channel_id) {
1142 Some(c) => c.update_progress(from, msg.reject, msg.entry_id, msg.hint_id),
1143 None => {}
1144 };
1145 }
1146
1147 fn handle_index(&mut self, from: u64, msg: IndexMsg) {
1148 info!(
1149 "node {} channel {} receive index msg from {}: {:?}",
1150 self.id, INDEX_CHANNEL_ID, from, msg
1151 );
1152 let mut reply_msg = IndexReplyMsg {
1153 reject: false,
1154 index_id: msg.prev_index_id + 1u64,
1155 hint_id: 0,
1156 };
1157 let length = msg.indexes.len();
1158 let committed_index_id = self.channels.get(&INDEX_CHANNEL_ID).unwrap().committed_id;
1159 if msg.prev_index_id < committed_index_id {
1160 reply_msg.reject = false;
1161 reply_msg.hint_id = committed_index_id + 1u64;
1162 } else if self.log_buffer.is_term_miss_matched(
1163 INDEX_CHANNEL_ID,
1164 msg.prev_index_id,
1165 msg.prev_index_term,
1166 ) {
1167 let reject_index_id = self
1168 .log_buffer
1169 .find_index_conflict(msg.prev_index_id, msg.prev_index_term);
1170 reply_msg.reject = true;
1171 reply_msg.hint_id = reject_index_id;
1172 self.channels
1173 .get_mut(&INDEX_CHANNEL_ID)
1174 .unwrap()
1175 .reset_tick();
1176 } else {
1177 let last_index_id = self.log_buffer.extend_indexes(msg.indexes);
1178 reply_msg.reject = false;
1179 reply_msg.hint_id = last_index_id + 1;
1180 let channel_info = self.channels.get_mut(&INDEX_CHANNEL_ID).unwrap();
1181 channel_info.update_committed_id(std::cmp::min(last_index_id, msg.committed_index_id));
1182 channel_info.reset_tick();
1183 }
1184
1185 debug!(
1186 "node {} channel {} receive index from {}, prev id {}, prev term {}, length {}, reply: {:?}",
1187 self.id, INDEX_CHANNEL_ID, from,
1188 msg.prev_index_id, msg.prev_index_term, length,
1189 reply_msg
1190 );
1191 let mut msg_header = self.build_msg_header(INDEX_CHANNEL_ID);
1192 msg_header.to = from;
1193 msg_header.detail = MsgDetail::IndexReply(reply_msg);
1194 self.ready.send_msg(msg_header);
1195 }
1196
1197 fn handle_index_reply(&mut self, from: u64, msg: IndexReplyMsg) {
1198 match self.channels.get_mut(&INDEX_CHANNEL_ID) {
1199 Some(c) => {
1200 c.update_progress(from, msg.reject, msg.index_id, msg.hint_id);
1201 if !msg.reject {
1202 self.advance_transfer_progress(from);
1203 }
1204 }
1205 None => {}
1206 };
1207 }
1208
1209 fn is_refreshed_index(&self, id: u64, term: u64) -> bool {
1210 let last_index_id = self.log_buffer.last_index_id();
1211 let last_index_term = self.log_buffer.last_index_term();
1212 term > last_index_term
1214 || (term == last_index_term && id >= last_index_id)
1216 }
1217
1218 fn handle_vote(&mut self, from: u64, msg: VoteMsg) {
1219 let mut detail = VoteReplyMsg { reject: true };
1220 if self.is_refreshed_index(msg.last_index_id, msg.last_index_term) {
1221 detail.reject = !self
1222 .channels
1223 .get_mut(&INDEX_CHANNEL_ID)
1224 .unwrap()
1225 .try_make_promise(from);
1226 }
1227
1228 info!(
1229 "node {} channel {} receive remote {}: {:?}, reply reject: {}",
1230 self.id, INDEX_CHANNEL_ID, from, msg, detail.reject
1231 );
1232
1233 let mut msg_header = self.build_msg_header(INDEX_CHANNEL_ID);
1234 msg_header.to = from;
1235 msg_header.detail = MsgDetail::VoteReply(detail);
1236 self.ready.send_msg(msg_header);
1237 }
1238
1239 fn handle_vote_reply(&mut self, from: u64, msg: VoteReplyMsg) {
1240 info!(
1241 "node {} channel {} receive remote {}: {:?}",
1242 self.id, INDEX_CHANNEL_ID, from, msg
1243 );
1244 if !msg.reject {
1245 self.receive_granted_vote(from);
1246 }
1247 }
1248
1249 fn receive_granted_vote(&mut self, from: u64) {
1250 let config_stage = unwrap_config_stage(&self.pending_configs);
1251 let channel_info = self.channels.get_mut(&INDEX_CHANNEL_ID).unwrap();
1252 channel_info.receive_promise(from);
1253 if channel_info.receive_majority_promise(config_stage) {
1254 let last_index_id = self.log_buffer.last_index_id();
1255 channel_info.to_student(last_index_id, "granted");
1256 let missed_channel_ids = channel_info.missed_channel_ids();
1257 if missed_channel_ids.len() > 0 {
1258 debug!(
1260 "node {} channel {} there have {} missed channel {:?} need to recovery",
1261 self.id,
1262 INDEX_CHANNEL_ID,
1263 missed_channel_ids.len(),
1264 missed_channel_ids
1265 );
1266 for channel_id in missed_channel_ids {
1267 self.bcast_prepare_request(channel_id, true);
1270 }
1271 } else {
1272 debug!(
1274 "node {} channel {} no any missed channel exists",
1275 self.id, INDEX_CHANNEL_ID
1276 );
1277 self.all_channel_already_learned();
1278 }
1279 }
1280 }
1281
1282 fn handle_prepare(&mut self, from: u64, channel_id: u64, msg: PrepareMsg) {
1283 let mut detail = PrepareReplyMsg {
1284 reject: false,
1285 learn: msg.learn,
1286 entry_metas: Vec::new(),
1287 };
1288 if !msg.learn {
1289 self.build_channel_if_not_exists(channel_id);
1290 detail.reject = !self
1291 .channels
1292 .get_mut(&channel_id)
1293 .unwrap()
1294 .try_make_promise(from);
1295 }
1296
1297 if !detail.reject {
1298 let last_entry_id = self.log_buffer.channel_last_entry_id(channel_id);
1299 let first_entry_id = self.log_buffer.channel_first_entry_id(channel_id);
1300 let mut begin_entry_id = msg.committed_entry_id + 1u64;
1301 if begin_entry_id < first_entry_id {
1302 begin_entry_id = first_entry_id + 1u64;
1305 }
1306 if msg.committed_entry_id < last_entry_id {
1307 detail.entry_metas =
1308 self.log_buffer
1309 .range_of_metas(channel_id, begin_entry_id, last_entry_id);
1310 }
1311 }
1312
1313 info!(
1314 "node {} channel {} receive remote {}: {:?}, reply {}",
1315 self.id, channel_id, from, msg, detail.reject
1316 );
1317
1318 let mut msg_header = self.build_msg_header(channel_id);
1319 msg_header.to = from;
1320 msg_header.detail = MsgDetail::PrepareReply(detail);
1321 self.ready.send_msg(msg_header);
1322 }
1323
1324 fn handle_prepare_reply(&mut self, from: u64, channel_id: u64, msg: PrepareReplyMsg) {
1325 assert_ne!(channel_id, INDEX_CHANNEL_ID);
1326 debug!("node {} channel {} receive remote {}: {:?}", self.id, channel_id, from, msg);
1327 if msg.reject {
1328 return;
1329 }
1330
1331 self.receive_prepare_promise(from, channel_id, msg.learn, msg.entry_metas);
1332 }
1333
1334 fn handle_declare(&mut self, from: u64, channel_id: u64, msg: DeclareMsg) {
1335 let last_id = self.log_buffer.channel_last_entry_id(channel_id);
1336 debug!(
1337 "node {} channel {} receive {} declare msg, committed id {}, local last id {}",
1338 self.id, channel_id, from, msg.committed_id, last_id
1339 );
1340 if last_id < msg.committed_id {
1341 debug!("receive unexpect declare msg");
1343 } else {
1344 self.build_channel_if_not_exists(channel_id);
1346 let channel_info = self.channels.get_mut(&channel_id).unwrap();
1347 channel_info.update_committed_id(msg.committed_id);
1348 channel_info.reset_tick();
1349 }
1350
1351 let mut msg_header = self.build_msg_header(channel_id);
1352 msg_header.to = from;
1353 msg_header.detail = MsgDetail::DeclareReply(DeclareReplyMsg {
1354 receiving_snapshot: if let SnapshotState::Loading = self.snapshot_state {
1355 true
1356 } else {
1357 false
1358 },
1359 });
1360 self.ready.send_msg(msg_header);
1361 }
1362
1363 fn handle_declare_reply(&mut self, from: u64, channel_id: u64, msg: DeclareReplyMsg) {
1364 match self.channels.get_mut(&channel_id) {
1365 Some(c) => {
1366 c.on_receive_msg(from);
1367 self.snapshot_receving_states
1368 .maybe_update_remote_snapshot_state(from, msg.receiving_snapshot);
1369 }
1370 None => {}
1371 };
1372 }
1373
1374 fn handle_read(&mut self, from: u64, msg: ReadMsg) {
1375 if !self.is_index_leader() {
1376 debug!(
1377 "node {} channel {} recieve read from {}: {:?}, but I am not index leader",
1378 self.id, INDEX_CHANNEL_ID, from, msg
1379 );
1380 return;
1381 }
1382
1383 debug!(
1384 "node {} channel {} recieve read from {}: {:?}",
1385 self.id, INDEX_CHANNEL_ID, from, msg
1386 );
1387 let mut msg_header = self.build_msg_header(INDEX_CHANNEL_ID);
1388 msg_header.to = from;
1389 msg_header.detail = MsgDetail::ReadReply(ReadReplyMsg {
1390 request_id: msg.request_id,
1391 recommend_id: self.get_index_channel().current_term_safe_commit_id(),
1392 });
1393 self.ready.send_msg(msg_header);
1394 }
1395
1396 fn apply_read_reply(&mut self, request_id: u64, recommend_id: u64) {
1397 let mut idx = 0;
1398 for value in &self.pending_reads {
1399 if *value > request_id {
1400 break;
1401 }
1402 self.ready.finished_reads.insert(*value, recommend_id);
1405 idx += 1;
1406 }
1407 self.pending_reads.drain(0..idx);
1408 }
1409
1410 fn handle_read_reply(&mut self, from: u64, msg: ReadReplyMsg) {
1411 self.apply_read_reply(msg.request_id, msg.recommend_id);
1412 }
1413
1414 fn handle_timeout_now(&mut self, from: u64, channel_id: u64) {
1415 if !self.channel_ids.contains(&self.id) {
1416 warn!(
1417 "node {} receive timeout-now from {}, but current membership is empty",
1418 self.id, from
1419 );
1420 return;
1421 }
1422
1423 let last_id = self.log_buffer.channel_last_entry_id(channel_id);
1424 let channel_info = self.channels.get_mut(&channel_id).unwrap();
1425 channel_info.to_candidate(last_id, "timeout-now");
1426 if channel_id == INDEX_CHANNEL_ID {
1427 self.bcast_vote_request();
1428 } else {
1429 self.bcast_prepare_request(channel_id, false);
1430 }
1431 }
1432
1433 fn handle_snapshot(&mut self, from: u64, desc: SnapshotDesc) {
1434 if self.snapshot_state != SnapshotState::None {
1436 warn!(
1437 "node {} already in {:?} snapshot stage, ignore new snapshot",
1438 self.id, self.snapshot_state
1439 );
1440 return;
1441 }
1442
1443 info!("node {} start receiving snapshot from {}: {:?}", self.id, from, desc);
1444 self.ready.pending_snapshot = Some(desc);
1445 self.snapshot_state = SnapshotState::Loading;
1446 }
1447
1448 fn handle_snapshot_reply(&mut self, from: u64, msg: SnapshotReplyMsg) {
1449 self.snapshot_receving_states
1450 .update_remote_snapshot_state(from, false);
1451 if msg.received {
1452 info!("node {} remote {} receive snapshot finished", self.id, from);
1453 for (channel_id, next_id) in &msg.hints {
1454 match self.channels.get_mut(channel_id) {
1455 Some(c) => c.update_progress(from, false, *next_id, *next_id),
1456 None => {}
1457 };
1458 }
1459 } else {
1460 debug!("node {} remote {} reject snapshot, try trigger new once", self.id, from);
1462 }
1463 }
1464
1465 fn dispatch_message(&mut self, msg: Message) {
1466 debug_assert_ne!(
1467 msg.from, self.id,
1468 "node {} receive a message {:?} from itself",
1469 self.id, msg
1470 );
1471
1472 match msg.detail {
1473 MsgDetail::Append(d) => self.handle_append(msg.from, msg.channel_id, d),
1474 MsgDetail::AppendReply(d) => self.handle_append_reply(msg.from, msg.channel_id, d),
1475 MsgDetail::Index(d) => self.handle_index(msg.from, d),
1476 MsgDetail::IndexReply(d) => self.handle_index_reply(msg.from, d),
1477 MsgDetail::Vote(d) => self.handle_vote(msg.from, d),
1478 MsgDetail::VoteReply(d) => self.handle_vote_reply(msg.from, d),
1479 MsgDetail::Prepare(d) => self.handle_prepare(msg.from, msg.channel_id, d),
1480 MsgDetail::PrepareReply(d) => self.handle_prepare_reply(msg.from, msg.channel_id, d),
1481 MsgDetail::Declare(d) => self.handle_declare(msg.from, msg.channel_id, d),
1482 MsgDetail::DeclareReply(d) => self.handle_declare_reply(msg.from, msg.channel_id, d),
1483 MsgDetail::Snapshot(d) => self.handle_snapshot(msg.from, d),
1484 MsgDetail::SnapshotReply(d) => self.handle_snapshot_reply(msg.from, d),
1485 MsgDetail::Read(d) => self.handle_read(msg.from, d),
1486 MsgDetail::ReadReply(d) => self.handle_read_reply(msg.from, d),
1487 MsgDetail::TimeoutNow => self.handle_timeout_now(msg.from, msg.channel_id),
1488 MsgDetail::None => {}
1489 _ => panic!("unknown message {:?}", msg),
1490 }
1491 }
1492
1493 fn bcast_heartbeats(&mut self, channel_id: u64) {
1494 let channel_info = self.channels.get(&channel_id).unwrap();
1495 debug!(
1496 "node {} channel {} bcast heartbeats, committed id {}",
1497 self.id, channel_id, channel_info.committed_id
1498 );
1499 let msg_header = self.build_msg_header(channel_id);
1500 for id in self.remote_ids() {
1501 let matched_committed_id = channel_info.matched_committed_id(id);
1502 debug!(
1503 "node {} channel {} send heartbeat to {}, matched committed id {}",
1504 self.id, channel_id, id, matched_committed_id
1505 );
1506 let mut msg = msg_header.clone();
1507 msg.to = id;
1508 msg.detail = MsgDetail::Declare(DeclareMsg {
1509 committed_id: matched_committed_id,
1510 });
1511 self.ready.send_msg(msg);
1512 }
1513 }
1514
1515 fn bcast_vote_request(&mut self) {
1516 let mut msg_header = self.build_msg_header(INDEX_CHANNEL_ID);
1517 msg_header.detail = MsgDetail::Vote(VoteMsg {
1518 last_index_id: self.log_buffer.last_index_id(),
1519 last_index_term: self.log_buffer.last_index_term(),
1520 });
1521 for to in self.remote_ids() {
1522 let mut msg = msg_header.clone();
1523 msg.to = to;
1524 self.ready.send_msg(msg);
1525 }
1526
1527 self.receive_granted_vote(self.id);
1528 }
1529
1530 fn receive_channel_tick(&mut self, channel_id: u64, random_elect_timeout_tick: u32) -> bool {
1531 let local_id = self.id;
1532 let config_stage = unwrap_config_stage(&self.pending_configs);
1533 let contains_self = self.channel_ids.contains(&local_id);
1534 let is_index_leader = self.is_index_leader();
1535 let last_id = self.log_buffer.channel_last_entry_id(channel_id);
1536 let channel_info = self.channels.get_mut(&channel_id).unwrap();
1537
1538 channel_info.elapsed_tick += 1;
1539 if channel_info.elapsed_tick >= random_elect_timeout_tick {
1540 self.random_election_timeout_tick =
1541 generate_election_timeout_tick(self.option.base_election_timeout_tick);
1542 return match &channel_info.role {
1543 Role::Leader | Role::Student => {
1544 channel_info.elapsed_tick = 0;
1545 if channel_info.advance_quorum_lease()
1546 < channel_info.stage_majority(config_stage)
1547 {
1548 channel_info.to_follower(
1549 INVALID_NODE_ID,
1550 channel_info.term,
1551 last_id,
1552 "lost-quorum",
1553 );
1554 false
1555 } else {
1556 channel_info.is_leader()
1557 }
1558 }
1559 Role::Follower | Role::Candidate
1560 if contains_self
1561 && (channel_id == INDEX_CHANNEL_ID
1562 || is_index_leader
1563 || channel_id == local_id) =>
1564 {
1565 channel_info.to_candidate(last_id, "election timeout");
1566 true
1567 }
1568 _ => {
1569 channel_info.to_follower(
1570 INVALID_NODE_ID,
1571 channel_info.term + 1,
1572 last_id,
1573 "election timeout",
1574 );
1575 false
1576 }
1577 };
1578 }
1579
1580 return Role::Leader == channel_info.role;
1581 }
1582
1583 fn append_empty_entry(&mut self, channel_id: u64) {
1584 let entry_id = self.channel_next_entry_id(channel_id);
1585 let entry = Entry {
1586 request_id: INTERNAL_REQUEST,
1587 channel_id: channel_id,
1588 entry_id: entry_id,
1589 index_id: INVALID_ID,
1590 channel_term: self.get_channel_term(channel_id),
1591 message: Vec::new(),
1592 context: None,
1593 configs: None,
1594 };
1595
1596 self.append_entry(channel_id, entry);
1597 if self.is_index_leader() {
1598 self.assign_index(channel_id, entry_id);
1599 }
1600 }
1601
1602 fn channel_already_learned(&mut self, channel_id: u64) {
1603 let channel_info = self.channels.get_mut(&INDEX_CHANNEL_ID).unwrap();
1604 if channel_info.is_leader() {
1605 debug!(
1606 "node {} already step channel {} leader, ignore staled learn msg from channel {}",
1607 self.id, INDEX_CHANNEL_ID, channel_id
1608 );
1609 return;
1610 }
1611
1612 channel_info.learned_voters.insert(channel_id);
1613
1614 if channel_info.learned_voters.len() == channel_info.missed_voters.len() {
1617 assert_eq!(channel_info.learned_voters, channel_info.missed_voters);
1618 self.all_channel_already_learned();
1619 }
1620 }
1621
1622 fn maybe_rebuild_pending_configs(&mut self) {
1623 let pending_configs = rebuild_pending_configs(self.id, &self.log_buffer, &self.channel_ids);
1624 if pending_configs.is_none()
1625 || is_same_pending_configs(&self.pending_configs, &pending_configs)
1626 {
1627 return;
1628 }
1629
1630 if self.pending_configs.is_some() {
1631 self.abort_pending_configs("find refreshed pending configs");
1632 }
1633 self.pending_configs = pending_configs;
1634 self.update_channels_config_stage();
1635 }
1636
1637 fn all_channel_already_learned(&mut self) {
1638 let last_index_id = self.log_buffer.last_index_id();
1639 let channel_info = self.channels.get_mut(&INDEX_CHANNEL_ID).unwrap();
1640 channel_info.to_leader(last_index_id, "learned");
1641
1642 let mut recovery_channels: Vec<u64> = channel_info.missed_voters.iter().cloned().collect();
1646 recovery_channels.extend(
1647 self.channel_ids
1648 .iter()
1649 .filter(|id| !channel_info.missed_voters.contains(*id) && **id != INDEX_CHANNEL_ID)
1650 .cloned(),
1651 );
1652 info!(
1653 "node {} channel {} learned voters {:?}, missed voters {:?}",
1654 self.id, INDEX_CHANNEL_ID, channel_info.learned_voters, channel_info.missed_voters
1655 );
1656 for channel_id in recovery_channels {
1657 let mut last_entry_id = self.log_buffer.channel_last_entry_id(channel_id);
1660 let last_assigned_entry_id = self.log_buffer.channel_last_assigned_entry_id(channel_id);
1661 let last_learned_entry_id = self
1662 .channels
1663 .get_mut(&channel_id)
1664 .unwrap()
1665 .last_learned_entry_meta()
1666 .id;
1667 if last_entry_id < last_learned_entry_id {
1668 last_entry_id = last_learned_entry_id;
1670 }
1671 if last_entry_id < last_assigned_entry_id {
1672 warn!("node {} channel {} recovery channel {} indexes: the entries ({}, {}] is missing",
1673 self.id, INDEX_CHANNEL_ID, channel_id, last_assigned_entry_id, last_entry_id);
1674 continue;
1675 }
1676 info!(
1677 "node {} channel {} re-assign index to channel {} recoveried entries ({}, {}]",
1678 self.id, INDEX_CHANNEL_ID, channel_id, last_assigned_entry_id, last_entry_id
1679 );
1680 for id in (last_assigned_entry_id + 1u64)..(last_entry_id + 1u64) {
1681 self.assign_index(channel_id, id);
1682 }
1683 }
1684
1685 self.assign_index(self.id, INVALID_ID);
1686 self.maybe_rebuild_pending_configs();
1687 self.submit_read_task();
1688 }
1689
1690 fn receive_prepare_promise(
1691 &mut self,
1692 from: u64,
1693 channel_id: u64,
1694 learn: bool,
1695 entry_metas: Vec<EntryMeta>,
1696 ) {
1697 assert_ne!(channel_id, INDEX_CHANNEL_ID);
1698 let config_stage = unwrap_config_stage(&self.pending_configs);
1699 let channel_info = self.channels.get_mut(&channel_id).unwrap();
1700 if channel_info.try_receive_prepare_entries(entry_metas) {
1701 info!("node {} channel {} learned entries from {}", self.id, channel_id, from);
1702 }
1703
1704 channel_info.receive_promise(from);
1705 if channel_info.receive_majority_promise(config_stage) {
1706 info!(
1708 "node {} channel {} has receive majority promised and learned {} entries",
1709 self.id,
1710 channel_id,
1711 channel_info.max_received_entries.len()
1712 );
1713 if self.id != channel_id && learn {
1714 self.channel_already_learned(channel_id);
1715 } else {
1716 let channel_last_entry_id = self.log_buffer.channel_last_entry_id(channel_id);
1719 channel_info.to_leader(channel_last_entry_id, "granted");
1720 if channel_id == self.id {
1721 self.append_empty_entry(channel_id);
1723 }
1724 }
1725 }
1726 }
1727
1728 fn promise_itself(&mut self, channel_id: u64, committed_entry_id: u64, learn: bool) {
1729 assert_ne!(channel_id, INDEX_CHANNEL_ID);
1730 let last_entry_id = self.log_buffer.channel_last_entry_id(channel_id);
1731 trace!(
1732 "node {} channel {} promise itself, committed entry id {}, last entry id {}",
1733 self.id,
1734 channel_id,
1735 committed_entry_id,
1736 last_entry_id
1737 );
1738 assert!(
1739 committed_entry_id <= last_entry_id,
1740 "committed entry id {} should less or equals to last entry id {}",
1741 committed_entry_id,
1742 last_entry_id,
1743 );
1744 let entry_metas = if learn {
1745 self.log_buffer
1746 .range_of_metas(channel_id, committed_entry_id + 1u64, last_entry_id)
1747 } else {
1748 Vec::new()
1749 };
1750 self.receive_prepare_promise(self.id, channel_id, learn, entry_metas);
1751 }
1752
1753 fn bcast_prepare_request(&mut self, channel_id: u64, learn: bool) {
1754 let matched_committed_id = self
1755 .channels
1756 .get(&channel_id)
1757 .unwrap()
1758 .matched_committed_id(self.id);
1759 debug!(
1760 "node {} channel {} bcast prepare request to {:?}, committed id {}",
1761 self.id,
1762 channel_id,
1763 self.remote_ids(),
1764 matched_committed_id,
1765 );
1766 let mut header = self.build_msg_header(channel_id);
1767 header.detail = MsgDetail::Prepare(PrepareMsg {
1768 learn,
1769 committed_entry_id: matched_committed_id,
1770 });
1771 for to in self.remote_ids() {
1772 let mut msg = header.clone();
1773 msg.to = to;
1774 self.ready.send_msg(msg);
1775 }
1776 self.promise_itself(channel_id, matched_committed_id, learn);
1777 }
1778
1779 fn send_timeout_now(&mut self, channel_id: u64, to: u64) {
1780 info!("node {} channel {} send timeout now to {}", self.id, channel_id, to);
1781 let mut msg_header = self.build_msg_header(channel_id);
1782 msg_header.to = to;
1783 msg_header.detail = MsgDetail::TimeoutNow;
1784 self.ready.send_msg(msg_header);
1785 }
1786
1787 fn transfer_leader(&mut self, channel_id: u64, to: u64) {
1788 let last_id = self.log_buffer.channel_last_entry_id(channel_id);
1789 let channel_info = self.channels.get(&channel_id).unwrap();
1790 if channel_info.is_remote_matched(to, last_id) {
1791 self.send_timeout_now(channel_id, to);
1792 }
1793 }
1794
1795 fn try_transfer_channel_leader(&mut self, channel_id: u64, to: u64) {
1796 if self.transfer.initiate(channel_id, to) {
1797 self.transfer_leader(channel_id, to);
1798 }
1799 }
1800
1801 fn advance_transfer_progress(&mut self, by: u64) {
1802 if self.transfer.doing()
1803 && self.transfer.channel_id == INDEX_CHANNEL_ID
1804 && self.transfer.to == by
1805 {
1806 self.transfer_leader(self.transfer.channel_id, self.transfer.to);
1807 }
1808 }
1809
1810 fn all_channels_tick(&mut self) {
1811 let random_elect_timeout_tick = self.random_election_timeout_tick;
1812 for idx in 0..self.channel_ids.len() {
1813 let channel_id = self.channel_ids[idx];
1814 if !self.receive_channel_tick(channel_id, random_elect_timeout_tick) {
1815 continue;
1816 }
1817
1818 match self.channels.get(&channel_id).unwrap().role {
1819 Role::Leader => {
1820 self.bcast_heartbeats(channel_id);
1821 if channel_id != self.id
1822 && channel_id != INDEX_CHANNEL_ID
1823 && self.pending_configs.is_none()
1824 {
1825 self.try_transfer_channel_leader(channel_id, channel_id);
1826 }
1827 }
1828 Role::Candidate if channel_id == INDEX_CHANNEL_ID => {
1829 assert_eq!(self.channel_ids.contains(&self.id), true);
1830 self.bcast_vote_request();
1831 }
1832 Role::Candidate => {
1833 assert_eq!(self.channel_ids.contains(&self.id), true);
1835 self.bcast_prepare_request(channel_id, false);
1836 }
1837 _ => {
1838 panic!("unexpected role")
1839 }
1840 }
1841 }
1842 }
1843
1844 fn reset_status_by_snapshot(
1845 &mut self,
1846 hard_states: &HashMap<u64, HardState>,
1847 desc: &SnapshotDesc,
1848 ) {
1849 self.log_buffer = MemStorage::recovery(self.id, &desc.channel_metas);
1850 self.snapshot_receving_states = RemoteSnapshotRecevingStates::new(self.id);
1851
1852 self.channel_ids = active_members(&desc.members);
1853 self.channel_ids.push(INDEX_CHANNEL_ID);
1854 self.channel_ids.sort();
1855
1856 let mut channels = HashMap::new();
1857 for channel_id in &self.channel_ids {
1858 let channel_last_id = self.log_buffer.channel_last_entry_id(*channel_id);
1859 let desc = ChannelDesc {
1860 channel_id: *channel_id,
1861 committed_id: channel_last_id,
1862 last_id: channel_last_id,
1863 hard_state: hard_states
1864 .get(channel_id)
1865 .unwrap_or(&HardState::default())
1866 .clone(),
1867 members: desc.members.clone(),
1868 };
1869 channels.insert(*channel_id, ChannelInfo::new(self.id, &desc));
1870 }
1871
1872 self.channels = channels;
1873 }
1874
1875 fn bcast_snapshot_finished(&mut self, received: bool, desc: &SnapshotDesc) {
1876 for channel_id in self.remote_ids() {
1877 let hints = desc
1878 .channel_metas
1879 .iter()
1880 .map(|(k, v)| (*k, v.id + 1u64))
1881 .collect::<HashMap<_, _>>();
1882 let mut msg = self.build_msg_header(channel_id);
1883 msg.to = channel_id;
1884 msg.detail = MsgDetail::SnapshotReply(SnapshotReplyMsg { received, hints });
1885 self.ready.send_msg(msg);
1886 }
1887 }
1888
1889 #[cfg(test)]
1890 fn enter_replicate_state(&mut self) {
1891 self.channels
1892 .iter_mut()
1893 .map(|(_, c)| c.progress_map.iter_mut())
1894 .flatten()
1895 .for_each(|(_, p)| {
1896 p.state = ProgressState::Replicate;
1897 p.active = true;
1898 });
1899 }
1900
1901 #[cfg(test)]
1902 fn reset_member_next_id(&mut self, nodes: Vec<u64>) {
1903 self.channels
1904 .iter_mut()
1905 .map(|(_, c)| c.progress_map.iter_mut())
1906 .flatten()
1907 .for_each(|(id, p)| {
1908 if nodes.contains(id) {
1909 p.next_id = p.match_id + 1u64;
1910 }
1911 });
1912 }
1913
1914 #[cfg(test)]
1915 fn remote_matched_id(&self, channel_id: u64, remote_id: u64) -> u64 {
1916 self.channels
1917 .get(&channel_id)
1918 .unwrap()
1919 .progress_map
1920 .get(&remote_id)
1921 .unwrap()
1922 .match_id
1923 }
1924
1925 fn check_index_leader(&self) -> Result<(), Error> {
1926 if !self.is_index_leader() {
1927 Err(Error::NotLeader)
1928 } else {
1929 Ok(())
1930 }
1931 }
1932
1933 fn check_local_leader(&self) -> Result<(), Error> {
1934 if !self.is_local_channel_leader() {
1935 Err(Error::NotLeader)
1936 } else {
1937 Ok(())
1938 }
1939 }
1940
1941 fn check_transfer_leader(&self) -> Result<(), Error> {
1942 if self.transfer.doing() && self.transfer.channel_id == INDEX_CHANNEL_ID {
1943 Err(Error::Transfering)
1944 } else {
1945 Ok(())
1946 }
1947 }
1948
1949 fn compaign_removing_channel_leaders(&mut self) -> bool {
1950 if let Some(p) = &self.pending_configs {
1951 let pending_channel_ids = p
1952 .old_configs
1953 .iter()
1954 .filter(|id| !self.is_channel_leader(**id))
1955 .cloned()
1956 .collect::<Vec<u64>>();
1957 debug!(
1958 "node {} channel {} pending configs old members {:?}, wait compaign channels {:?}",
1959 self.id, INDEX_CHANNEL_ID, p.old_configs, pending_channel_ids
1960 );
1961 if pending_channel_ids.is_empty() {
1962 return true;
1963 }
1964 for channel_id in pending_channel_ids {
1965 self.handle_timeout_now(self.id, channel_id);
1966 }
1967 }
1968 false
1969 }
1970
1971 fn submit_config_change_entry(&mut self, configs: HashSet<u64>, stage: ConfigStage) -> u64 {
1972 assert_eq!(self.is_index_leader(), true);
1973
1974 let entry_id = self.next_entry_id();
1975 let index_id = self.log_buffer.next_index_id();
1976 let local_term = self.local_entry_channel_term();
1977 let entry = Entry {
1978 request_id: CONFIG_CHANGE_ID,
1979 channel_id: self.id,
1980 entry_id: entry_id,
1981 index_id: INVALID_ID,
1982 channel_term: local_term,
1983 message: vec![],
1984 context: None,
1985 configs: Some(ChangeConfig {
1986 index_id,
1987 entry_id,
1988 term: local_term,
1989 stage,
1990 members: configs,
1991 }),
1992 };
1993
1994 self.append_entry(self.id, entry);
1995 self.assign_index(self.id, entry_id)
1996 }
1997
1998 fn update_channels_config_stage(&mut self) {
1999 assert_eq!(self.pending_configs.is_some(), true);
2000 let pending_configs = self.pending_configs.as_mut().unwrap();
2001 match &pending_configs.stage {
2002 ConfigStage::Old | ConfigStage::New => {}
2003 ConfigStage::Both => {
2004 for channel_id in &self.channel_ids {
2005 self.channels
2006 .get_mut(channel_id)
2007 .unwrap()
2008 .enter_both_config_stage(
2009 &pending_configs.new_configs,
2010 &pending_configs.old_configs,
2011 );
2012 }
2013 self.ready.should_stable_metas = true;
2014 }
2015 }
2016 }
2017
2018 fn setup_new_config_channels<L>(&mut self, log_meta_view: &L)
2019 where
2020 L: LogMetaView,
2021 {
2022 assert_eq!(self.pending_configs.is_some(), true);
2023 let pending_configs = self.pending_configs.as_ref().unwrap();
2024 let members: Vec<u64> = pending_configs.configs.iter().cloned().collect();
2025 let mut new_channel_ids = members.clone();
2026 new_channel_ids.push(INDEX_CHANNEL_ID);
2027 new_channel_ids.sort();
2028 info!(
2029 "node {} channel {} update channels from {:?} to {:?}",
2030 self.id, INDEX_CHANNEL_ID, self.channel_ids, new_channel_ids
2031 );
2032 self.channel_ids = new_channel_ids;
2033
2034 for id in &pending_configs.old_configs {
2035 assert_ne!(*id, INDEX_CHANNEL_ID);
2036 self.channels.remove(&id);
2037 }
2038 let hard_states = log_meta_view.hard_states();
2039 for channel_id in &pending_configs.new_configs {
2040 if self.channels.contains_key(channel_id) {
2041 info!(
2042 "node {} channel {} already exists, ignore add command",
2043 self.id, *channel_id
2044 );
2045 continue;
2046 }
2047 let desc = ChannelDesc {
2048 channel_id: *channel_id,
2049 committed_id: 0, last_id: self.log_buffer.channel_last_entry_id(*channel_id),
2051 hard_state: hard_states
2052 .get(channel_id)
2053 .unwrap_or(&HardState::default())
2054 .clone(),
2055 members: members
2056 .iter()
2057 .map(|id| (*id, MemberState::default()))
2058 .collect(),
2059 };
2060 self.channels
2061 .insert(*channel_id, ChannelInfo::new(self.id, &desc));
2062 }
2063 for channel_id in &self.channel_ids {
2064 self.channels
2065 .get_mut(channel_id)
2066 .unwrap()
2067 .enter_new_config_stage(&pending_configs.new_configs, &pending_configs.old_configs);
2068 }
2069 self.ready.should_stable_metas = true;
2070 }
2071
2072 fn is_pending_configs_in_new_stage(&self) -> bool {
2073 self.pending_configs
2074 .as_ref()
2075 .map(|c| c.stage)
2076 .unwrap_or(ConfigStage::Old)
2077 == ConfigStage::New
2078 }
2079
2080 fn maybe_apply_config_change<L>(&mut self, log_meta_view: &L)
2081 where
2082 L: LogMetaView,
2083 {
2084 assert_eq!(self.pending_configs.is_some(), true);
2085
2086 let pending_configs = self.pending_configs.as_ref().unwrap();
2087 assert_eq!(pending_configs.stage, ConfigStage::New);
2088 if pending_configs.index_id <= self.log_buffer.chosen_index_id {
2089 info!(
2090 "node {} channel {} config change to {:?} success",
2091 self.id, INDEX_CHANNEL_ID, pending_configs.configs
2092 );
2093 if self.is_index_leader() {
2094 self.bcast_each_channels(log_meta_view);
2097 }
2098 self.setup_new_config_channels(log_meta_view);
2099 self.pending_configs = None;
2100 self.ready.should_stable_metas = true;
2101 }
2102 }
2103
2104 fn maybe_advance_config_change_stage(&mut self) {
2105 assert_eq!(self.is_index_leader(), true);
2106 assert_eq!(self.pending_configs.is_some(), true);
2107
2108 let pending_configs = self.pending_configs.as_mut().unwrap();
2109 match &pending_configs.stage {
2110 ConfigStage::Old => {
2111 info!(
2112 "node {} channel {} config entry at {:?} has already compagin all channels leader, step to {:?}",
2113 self.id, INDEX_CHANNEL_ID, ConfigStage::Old, ConfigStage::Both
2114 );
2115 pending_configs.stage = ConfigStage::Both;
2116 pending_configs.index_id = self.log_buffer.next_index_id();
2117 pending_configs.entry_id = self.log_buffer.channel_next_entry_id(self.id);
2118 let configs = pending_configs.configs.clone();
2119 self.submit_config_change_entry(configs, ConfigStage::Both);
2120 self.update_channels_config_stage();
2121 }
2122 ConfigStage::Both if pending_configs.index_id <= self.log_buffer.chosen_index_id => {
2123 info!(
2124 "node {} channel {} config entry at {:?} has already choosen, step to {:?}",
2125 self.id,
2126 INDEX_CHANNEL_ID,
2127 ConfigStage::Both,
2128 ConfigStage::New,
2129 );
2130 self.ready.should_stable_metas = true;
2131 pending_configs.stage = ConfigStage::New;
2132 pending_configs.index_id = self.log_buffer.next_index_id();
2133 pending_configs.entry_id = self.log_buffer.channel_next_entry_id(self.id);
2134 let configs = pending_configs.configs.clone();
2135 self.submit_config_change_entry(configs, ConfigStage::New);
2136 self.update_channels_config_stage();
2137 }
2138 _ => {}
2139 }
2140 }
2141
2142 fn check_pending_configs(&self) -> Result<(), Error> {
2143 if let None = &self.pending_configs {
2144 Ok(())
2145 } else {
2146 Err(Error::Busy)
2147 }
2148 }
2149
2150 fn release_memory(&mut self) {
2151 let mut replicated_index_ids = self
2152 .channels
2153 .get(&INDEX_CHANNEL_ID)
2154 .unwrap()
2155 .progress_map
2156 .iter()
2157 .map(|(_, p)| p.match_id)
2158 .collect::<Vec<_>>();
2159
2160 let total_numbers = replicated_index_ids.len();
2161 if total_numbers == 0 {
2162 return;
2163 }
2164
2165 replicated_index_ids.sort();
2166 let majority_replicated_index_id =
2167 replicated_index_ids[total_numbers - crate::progress::majority(total_numbers)];
2168
2169 let min_replicated_index_id = replicated_index_ids[0];
2170 let diff = majority_replicated_index_id - min_replicated_index_id;
2171 self.log_buffer.release_entries_until(if diff > 10000 {
2172 majority_replicated_index_id - (0.618 * diff as f64) as u64
2173 } else {
2174 min_replicated_index_id
2175 });
2176 }
2177
2178 fn forward_read_requests(&mut self) -> bool {
2181 let index_leader_id = self.get_index_channel().leader_id;
2182 if index_leader_id == INVALID_NODE_ID || index_leader_id == self.id {
2183 warn!("node {} try forward read msg to leader, but no such leader are found", self.id);
2184 return false;
2185 }
2186
2187 debug_assert_ne!(index_leader_id, INDEX_CHANNEL_ID);
2188
2189 let mut msg = self.build_msg_header(INDEX_CHANNEL_ID);
2190 msg.to = index_leader_id;
2191
2192 let last_request_id = self.next_read_request_id - 1u64;
2193 msg.detail = MsgDetail::Read(ReadMsg {
2194 request_id: last_request_id,
2195 });
2196 self.ready.send_msg(msg);
2197
2198 debug!(
2199 "node {} forward read msg with request id {} to leader {}",
2200 self.id, last_request_id, index_leader_id
2201 );
2202
2203 true
2204 }
2205
2206 fn advance_read_requests(&mut self) {
2207 if self.undoing_reads.is_empty() {
2208 return;
2209 }
2210
2211 let is_leader = self.is_index_leader();
2212 if !is_leader && !self.forward_read_requests() {
2213 return;
2214 }
2215
2216 let mut ureads: Vec<u64> = vec![];
2217 std::mem::swap(&mut self.undoing_reads, &mut ureads);
2218 self.pending_reads.extend(ureads.into_iter());
2219
2220 if is_leader {
2221 self.apply_read_reply(
2222 self.next_read_request_id,
2223 self.get_index_channel().current_term_safe_commit_id(),
2224 );
2225 }
2226 }
2227
2228 fn submit_read_task(&mut self) -> u64 {
2229 let request_id = self.next_read_request_id;
2230 self.next_read_request_id += 1;
2231 self.undoing_reads.push(request_id);
2232
2233 request_id
2234 }
2235
2236 fn transfer_tick(&mut self) {
2237 self.transfer.elapsed();
2238 if self.transfer.timeout() {
2239 info!(
2240 "node {} channel {} transfer leadership to {} timeout, abort transfer task",
2241 self.id, self.transfer.channel_id, self.transfer.to
2242 );
2243 self.transfer.abort();
2244 }
2245 }
2246}
2247
2248fn rebuild_pending_configs(
2249 id: u64,
2250 log_buffer: &MemStorage,
2251 channel_ids: &Vec<u64>,
2252) -> Option<PendingConfigs> {
2253 let entry = match log_buffer.last_unchosen_config_change_entry() {
2254 Some(e) => e,
2255 None => return None,
2256 };
2257
2258 let config_change = entry
2259 .configs
2260 .as_ref()
2261 .expect("this entry MUST be a config change");
2262 let old_members = filter_index_channel(channel_ids);
2263 debug!(
2264 "node {} channel {} found pending config change {:?} {:?}, old members: {:?}",
2265 id, INDEX_CHANNEL_ID, entry, config_change, old_members
2266 );
2267 Some(PendingConfigs::new(
2268 entry.channel_id,
2269 config_change.index_id,
2270 config_change.entry_id,
2271 config_change.term,
2272 config_change.stage,
2273 config_change.members.clone(),
2274 &old_members,
2275 ))
2276}
2277
2278fn rebuild_channels(
2279 id: u64,
2280 members: &HashMap<u64, MemberState>,
2281 log_buffer: &MemStorage,
2282 hard_states: &HashMap<u64, HardState>,
2283) -> (Vec<u64>, HashMap<u64, ChannelInfo>) {
2284 let mut channel_ids = active_members(members);
2287 channel_ids.push(INDEX_CHANNEL_ID);
2288 channel_ids.sort();
2289
2290 let mut channels = HashMap::new();
2291 for channel_id in &channel_ids {
2292 let committed_id = log_buffer.channel_first_entry_id(*channel_id);
2295 let desc = ChannelDesc {
2296 channel_id: *channel_id,
2297 committed_id: committed_id,
2298 last_id: log_buffer.channel_last_entry_id(*channel_id),
2299 hard_state: hard_states
2300 .get(channel_id)
2301 .unwrap_or(&HardState::default())
2302 .clone(),
2303 members: members.clone(),
2304 };
2305 channels.insert(*channel_id, ChannelInfo::new(id, &desc));
2306 assert_eq!(channels.get(channel_id).unwrap().get_local_match_id(), desc.last_id);
2307 }
2308 (channel_ids, channels)
2309}
2310
2311impl Sdcons {
2312 pub fn new(
2313 id: u64,
2314 applied_id: u64,
2315 option: SdconsOption,
2316 log_buffer: MemStorage,
2317 membership: &HashMap<u64, MemberState>,
2318 hard_states: &HashMap<u64, HardState>,
2319 ) -> Sdcons {
2320 let (channel_ids, channels) = rebuild_channels(id, membership, &log_buffer, hard_states);
2321 let pending_configs = rebuild_pending_configs(id, &log_buffer, &channel_ids);
2322 let random_tick = generate_election_timeout_tick(option.base_election_timeout_tick);
2323 let mut s = Sdcons {
2324 id,
2325 random_election_timeout_tick: random_tick,
2326 option,
2327 log_buffer,
2328 channel_ids,
2329 channels,
2330 pending_configs,
2331
2332 next_read_request_id: 1,
2333 pending_reads: VecDeque::new(),
2334 undoing_reads: Vec::new(),
2335
2336 choosen_id: applied_id,
2337 pending_id: applied_id,
2338 applied_id: applied_id,
2339
2340 transfer: TransferingRecord::default(),
2341 snapshot_state: SnapshotState::None,
2342 snapshot_receving_states: RemoteSnapshotRecevingStates::new(id),
2343
2344 ready: Ready::default(),
2345 };
2346
2347 if s.pending_configs.is_some() {
2348 s.update_channels_config_stage();
2349 }
2350
2351 s
2352 }
2353
2354 pub fn change_config(&mut self, members: Vec<u64>) -> Result<u64, Error> {
2355 self.check_index_leader()?;
2356 self.check_local_leader()?;
2357 self.check_transfer_leader()?;
2358 self.check_pending_configs()?;
2359
2360 let channel_ids = filter_index_channel(&self.channel_ids);
2361
2362 let pending_configs = PendingConfigs::new(
2364 self.id,
2365 INVALID_ID,
2366 INVALID_ID,
2367 INITIAL_TERM,
2368 ConfigStage::Old,
2369 members.into_iter().collect(),
2370 &channel_ids,
2371 );
2372
2373 debug!(
2374 "node {} receive change config: {:?} new configs: {:?}, old configs: {:?}",
2375 self.id,
2376 pending_configs.configs,
2377 pending_configs.new_configs,
2378 pending_configs.old_configs
2379 );
2380
2381 self.pending_configs = Some(pending_configs);
2382 Ok(0)
2383 }
2384
2385 pub fn submit_task(&mut self, request_id: u64, task: Task) -> Result<u64, Error> {
2386 self.check_local_leader()?;
2387 self.check_transfer_leader()?;
2388
2389 let entry_id = self.next_entry_id();
2390 let entry = Entry {
2391 request_id,
2392 channel_id: self.id,
2393 entry_id: entry_id,
2394 index_id: INVALID_ID,
2395 channel_term: self.local_entry_channel_term(),
2396 message: task.message,
2397 context: task.context,
2398 configs: None,
2399 };
2400
2401 self.append_entry(self.id, entry);
2402 if self.is_index_leader() {
2403 self.assign_index(self.id, entry_id);
2404 }
2405
2406 Ok(entry_id)
2407 }
2408
2409 pub fn leased_read(&mut self) -> Result<u64, Error> {
2412 self.check_local_leader()?;
2413 self.check_transfer_leader()?;
2414
2415 Ok(self.submit_read_task())
2416 }
2417
2418 pub fn tick(&mut self) {
2419 self.transfer_tick();
2420 self.all_channels_tick();
2421
2422 if self.is_index_leader() {
2425 if self.compaign_removing_channel_leaders() {
2426 self.maybe_advance_config_change_stage();
2427 }
2428 } else if !self.pending_reads.is_empty() {
2429 self.forward_read_requests();
2432 }
2433 }
2434
2435 pub fn step(&mut self, msg: Message) {
2436 if self.reject_staled_message(&msg) {
2437 return;
2438 }
2439 self.try_advance_channel_term(&msg);
2440 self.dispatch_message(msg);
2441 }
2442
2443 pub fn control(&mut self, c: Control) -> Result<(), Error> {
2444 match c {
2445 Control::Checkpoint => {
2446 if self.snapshot_state != SnapshotState::None {
2447 warn!(
2448 "node {} snapshot already in {:?} state, ignore checkpoint request",
2449 self.id, self.snapshot_state
2450 );
2451 } else {
2452 self.snapshot_state = SnapshotState::Creating;
2453 }
2454 }
2455 Control::ReleaseMemory => {
2456 self.release_memory();
2457 }
2458 Control::TimeoutNow => {
2459 info!(
2460 "node {} channel {} receive timeout now, start campaign",
2461 self.id, INDEX_CHANNEL_ID
2462 );
2463 self.handle_timeout_now(self.id, INDEX_CHANNEL_ID);
2464 }
2465 Control::TransferLeader(node_id) => {
2466 self.check_index_leader()?;
2467 if node_id != self.id {
2468 if !self.transfer.initiate(INDEX_CHANNEL_ID, node_id) {
2469 return Err(Error::Transfering);
2470 }
2471
2472 info!(
2473 "node {} channel {} start transfer leadership to {}",
2474 self.id, INDEX_CHANNEL_ID, node_id
2475 );
2476 self.advance_transfer_progress(node_id);
2477 } else {
2478 warn!(
2479 "node {} channel {} try transfer leadership to self",
2480 self.id, INDEX_CHANNEL_ID
2481 );
2482 }
2483 }
2484 }
2485 Ok(())
2486 }
2487
2488 pub fn advance<L>(&mut self, log_meta_view: &L) -> Ready
2489 where
2490 L: LogMetaView,
2491 {
2492 self.bcast_each_channels(log_meta_view);
2493 self.advance_choose_progress();
2494 self.advance_read_requests();
2495
2496 if self.snapshot_state != SnapshotState::Loading {
2498 self.extract_chosen_entries();
2499 self.extract_unstable_entries();
2500 if self.is_pending_configs_in_new_stage() {
2501 self.maybe_apply_config_change(log_meta_view);
2502 }
2503 }
2504
2505 if self.ready.chosen_entries.len() > 0 {
2506 debug!("apply {:?}", self.ready.chosen_entries);
2507 }
2508
2509 self.collect_state_snapshot();
2512 std::mem::take(&mut self.ready)
2513 }
2514
2515 pub fn submit_apply_result(&mut self, _from: u64, to: u64) -> bool {
2516 self.log_buffer.submit_applied_result(to)
2517 }
2518
2519 pub fn submit_stable_result(&mut self, channel_id: u64, _from: u64, to: u64) -> bool {
2520 if let Some(channel_info) = self.channels.get_mut(&channel_id) {
2521 channel_info.update_local_match(to);
2522 }
2523 if channel_id == INDEX_CHANNEL_ID {
2524 self.log_buffer.submit_stable_index_result(to)
2525 } else {
2526 self.log_buffer.submit_stable_result(channel_id, to)
2527 }
2528 }
2529
2530 pub fn log_replicated(&mut self, node_id: u64, channel_id: u64, next_id: u64) {
2531 match self.channels.get_mut(&channel_id) {
2532 Some(c) => c.log_replicated(node_id, next_id),
2533 None => {}
2534 };
2535 }
2536
2537 pub fn finish_snapshot_loading(
2538 &mut self,
2539 received: bool,
2540 hard_states: &HashMap<u64, HardState>,
2541 desc: &SnapshotDesc,
2542 ) {
2543 if received {
2544 self.reset_status_by_snapshot(hard_states, desc);
2545 }
2546 self.bcast_snapshot_finished(received, desc);
2547 self.snapshot_state = SnapshotState::None;
2548 }
2549
2550 pub fn finish_checkpoint(&mut self) {
2551 self.snapshot_state = SnapshotState::None;
2552 }
2553
2554 pub fn is_sending_snapshot(&self) -> bool {
2555 self.snapshot_receving_states
2556 .is_any_remote_receiving_snapshot()
2557 }
2558
2559 pub fn leader_id(&self) -> Option<u64> {
2560 let leader = self.get_index_channel().leader_id;
2561 if leader == INVALID_NODE_ID {
2562 None
2563 } else {
2564 Some(leader)
2565 }
2566 }
2567}
2568
2569#[cfg(test)]
2570mod tests {
2571 use super::*;
2572
2573 use std::collections::HashMap;
2574 use std::iter::FromIterator;
2575
2576 use log::{Metadata, Record};
2577
2578 struct SimpleLogger;
2579 impl log::Log for SimpleLogger {
2580 fn enabled(&self, _metadata: &Metadata) -> bool {
2581 true
2582 }
2583
2584 fn log(&self, record: &Record) {
2585 println!(
2586 "[{} - {} - {}:{}] {}",
2587 record.level(),
2588 record.target(),
2589 record.file().unwrap_or("unknown"),
2590 record.line().unwrap_or(0),
2591 record.args()
2592 );
2593 }
2594
2595 fn flush(&self) {}
2596 }
2597
2598 static LOGGER: SimpleLogger = SimpleLogger;
2599
2600 struct LogMeta {
2601 pub first_index_id: u64,
2602 pub last_index_id: u64,
2603 pub hard_state_map: HashMap<u64, HardState>,
2604 pub members: HashMap<u64, MemberState>,
2605 }
2606
2607 impl Default for LogMeta {
2608 fn default() -> Self {
2609 LogMeta {
2610 first_index_id: INVALID_ID,
2611 last_index_id: INVALID_ID,
2612 hard_state_map: HashMap::new(),
2613 members: HashMap::new(),
2614 }
2615 }
2616 }
2617
2618 impl LogMetaView for LogMeta {
2619 fn membership(&self) -> HashMap<u64, MemberState> {
2620 self.members.clone()
2621 }
2622
2623 fn hard_states(&self) -> HashMap<u64, HardState> {
2624 self.hard_state_map.clone()
2625 }
2626
2627 fn range_of(&self, _channel_id: u64) -> (u64, u64) {
2628 (self.first_index_id, self.last_index_id)
2629 }
2630
2631 fn latest_snapshot(&self) -> Option<SnapshotDesc> {
2632 None
2633 }
2634 }
2635
2636 static SETUP_LOGGER: std::sync::Once = std::sync::Once::new();
2637 fn init_sdcons(log_meta: &LogMeta) -> Sdcons {
2638 let local_id = 1;
2639 let mut mem_store = MemStorage::new(local_id);
2640 for (id, _) in &log_meta.members {
2641 mem_store.append_entries(*id, vec![]);
2642 }
2643 init_sdcons_with_mem_store(log_meta, mem_store)
2644 }
2645
2646 fn init_sdcons_with_mem_store(log_meta: &LogMeta, mem_store: MemStorage) -> Sdcons {
2647 SETUP_LOGGER.call_once(|| {
2648 log::set_logger(&LOGGER)
2649 .map(|()| log::set_max_level(log::LevelFilter::Trace))
2650 .expect("init logger");
2651 });
2652
2653 let local_id = 1;
2654 let opt = SdconsOption::default();
2655 let hard_states = log_meta.hard_states();
2656 let membership = log_meta.membership();
2657 Sdcons::new(local_id, INVALID_ID, opt, mem_store, &membership, &hard_states)
2658 }
2659
2660 fn task(value: u8) -> Task {
2661 Task {
2662 message: vec![value],
2663 context: None,
2664 }
2665 }
2666
2667 fn wait_timeout(sd: &mut Sdcons) {
2668 info!("wait node {} election timeout", sd.id);
2669 let timeout_ticks = sd.option.base_election_timeout_tick * 2;
2670 for _ in 0..timeout_ticks {
2671 sd.tick();
2672 }
2673 }
2674
2675 fn new_entry(member_id: u64, entry_id: u64, term: u64) -> Entry {
2676 Entry {
2677 entry_id,
2678 index_id: INVALID_ID,
2679 request_id: 2,
2680 channel_id: member_id,
2681 channel_term: term,
2682 message: vec![0, 1, 2, 3],
2683 context: None,
2684 configs: None,
2685 }
2686 }
2687
2688 fn new_index_with_id(member_id: u64, index_id: u64, entry_id: u64, term: u64) -> LogIndex {
2689 LogIndex {
2690 channel_id: member_id,
2691 entry_id,
2692 term,
2693 index_id,
2694 context: None,
2695 }
2696 }
2697
2698 fn accept_prepare(ready: &mut Ready, s: &mut Sdcons, entries_map: HashMap<u64, Vec<Entry>>) {
2699 info!("accept prepare msgs of node {}", s.id);
2700 let msgs = ready
2701 .msgs
2702 .iter()
2703 .map(|(_id, msgs)| msgs.iter())
2704 .flatten()
2705 .collect::<Vec<_>>();
2706 for msg in msgs {
2707 let mut reply = Message {
2708 from: msg.to,
2709 to: msg.from,
2710 index_term: msg.index_term,
2711 channel_id: msg.channel_id,
2712 channel_term: msg.channel_term,
2713 detail: MsgDetail::None,
2714 };
2715 if let MsgDetail::Prepare(p) = &msg.detail {
2716 let mut entries = entries_map.get(&msg.to).unwrap_or(&vec![]).clone();
2717 for entry in &mut entries {
2718 entry.channel_id = msg.channel_id;
2719 }
2720 reply.detail = MsgDetail::PrepareReply(PrepareReplyMsg {
2721 reject: false,
2722 learn: p.learn,
2723 entry_metas: entries.iter().map(|e| EntryMeta::from(e)).collect(),
2724 });
2725 s.step(reply);
2726 };
2727 }
2728 }
2729
2730 fn accept_election(ready: &mut Ready, s: &mut Sdcons) {
2731 info!("accept all election msgs of node {}", s.id);
2732 let msgs = ready
2733 .msgs
2734 .iter()
2735 .map(|(_id, msgs)| msgs.iter())
2736 .flatten()
2737 .collect::<Vec<_>>();
2738 for msg in &msgs {
2739 let mut reply = Message {
2740 from: msg.to,
2741 to: msg.from,
2742 index_term: msg.index_term,
2743 channel_id: msg.channel_id,
2744 channel_term: msg.channel_term,
2745 detail: MsgDetail::None,
2746 };
2747 match &msg.detail {
2748 MsgDetail::Prepare(p) => {
2749 reply.detail = MsgDetail::PrepareReply(PrepareReplyMsg {
2750 reject: false,
2751 learn: p.learn,
2752 entry_metas: Vec::new(),
2753 });
2754 s.step(reply);
2755 }
2756 MsgDetail::Vote(_) => {
2757 reply.detail = MsgDetail::VoteReply(VoteReplyMsg { reject: false });
2758 s.step(reply);
2759 }
2760 _ => {}
2761 }
2762 }
2763 }
2764
2765 #[test]
2766 fn single_member_election() {
2767 let mut meta = LogMeta::default();
2768 meta.members.insert(1, MemberState::default());
2769 let mut s = init_sdcons(&meta);
2770
2771 wait_timeout(&mut s);
2772 assert_eq!(s.is_index_leader(), true);
2773 assert_eq!(s.is_local_channel_leader(), true);
2774 }
2775
2776 #[test]
2777 fn multi_member_election() {
2778 let mut meta = LogMeta::default();
2779 meta.members.insert(1, MemberState::default());
2780 meta.members.insert(2, MemberState::default());
2781 meta.members.insert(3, MemberState::default());
2782 let mut s = init_sdcons(&meta);
2783
2784 wait_timeout(&mut s);
2785 assert_eq!(s.is_channel_candidate(s.id), true);
2786 assert_eq!(s.is_channel_candidate(INDEX_CHANNEL_ID), true);
2787
2788 let mut ready = s.advance(&meta);
2789 accept_election(&mut ready, &mut s);
2790
2791 assert_eq!(s.is_local_channel_leader(), true);
2792 assert_eq!(s.is_index_student(), true);
2793
2794 let mut ready = s.advance(&meta);
2795 accept_election(&mut ready, &mut s);
2796 assert_eq!(s.is_index_leader(), true);
2797 }
2798
2799 #[test]
2800 fn learn_refreshed_entries() {
2801 let mut meta = LogMeta::default();
2802 meta.members.insert(1, MemberState::default());
2803 meta.members.insert(2, MemberState::default());
2804 meta.members.insert(3, MemberState::default());
2805 meta.members.insert(4, MemberState::default());
2806 meta.members.insert(5, MemberState::default());
2807 meta.hard_state_map.insert(
2808 1,
2809 HardState {
2810 voted_for: INVALID_NODE_ID,
2811 current_term: 10,
2812 },
2813 );
2814 let mut s = init_sdcons(&meta);
2815
2816 wait_timeout(&mut s);
2817 assert_eq!(s.is_channel_candidate(s.id), true);
2818 assert_eq!(s.is_channel_candidate(INDEX_CHANNEL_ID), true);
2819
2820 let mut ready = s.advance(&meta);
2821 accept_election(&mut ready, &mut s);
2822
2823 assert_eq!(s.is_local_channel_leader(), true);
2824 assert_eq!(s.is_index_student(), true);
2825
2826 let mut ready = s.advance(&meta);
2827 let mut entries_map = HashMap::new();
2828 entries_map.insert(
2829 2,
2830 vec![
2831 new_entry(1, 1, 3),
2832 new_entry(1, 2, 3),
2833 new_entry(1, 3, 3),
2834 new_entry(1, 4, 4),
2835 ],
2836 );
2837 entries_map.insert(3, vec![new_entry(1, 1, 9)]);
2838 entries_map.insert(4, vec![new_entry(1, 1, 9)]);
2839 entries_map.insert(5, vec![new_entry(1, 1, 9)]);
2840 accept_prepare(&mut ready, &mut s, entries_map);
2841
2842 assert_eq!(s.is_index_leader(), true);
2843 let c = s
2844 .channels
2845 .get(&INDEX_CHANNEL_ID)
2846 .unwrap()
2847 .learned_voters
2848 .iter()
2849 .next()
2850 .unwrap();
2851 let channel_info = s.channels.get(c).unwrap();
2852 let entry_meta = channel_info.last_learned_entry_meta();
2853 assert_eq!(entry_meta.id, 1);
2854 assert_eq!(entry_meta.term, 9);
2855 }
2856
2857 #[test]
2858 fn assign_indexes_to_entries_recieved_in_electing() {
2859 let meta = init_log_meta_with_default_members();
2860 let mut s = init_sdcons(&meta);
2861
2862 wait_timeout(&mut s);
2863 assert_eq!(s.is_channel_candidate(s.id), true);
2864 assert_eq!(s.is_channel_candidate(INDEX_CHANNEL_ID), true);
2865
2866 let mut ready = s.advance(&meta);
2867 accept_election(&mut ready, &mut s);
2868
2869 assert_eq!(s.is_local_channel_leader(), true);
2870 assert_eq!(s.is_index_student(), true);
2871
2872 let requests = vec![1, 2, 3];
2874 submit_tasks(&mut s, &requests);
2875
2876 let mut ready = s.advance(&meta);
2877 accept_prepare(&mut ready, &mut s, HashMap::new());
2878
2879 s.enter_replicate_state();
2880
2881 let ready = s.advance(&meta);
2882 accept_append_entries(&ready, &mut s, &HashSet::new());
2883 accept_indexes(&ready, &mut s, &HashSet::new());
2884 stable_all_entries(&ready, &mut s);
2885 stable_indexes(&ready, &mut s);
2886
2887 s.enter_replicate_state();
2888
2889 let ready = s.advance(&meta);
2890 assert_chosen_entries(&ready, &requests);
2891 }
2892
2893 #[test]
2894 fn handle_vote_request() {
2895 let mut meta = LogMeta::default();
2896 meta.members.insert(1, MemberState::default());
2897 meta.members.insert(2, MemberState::default());
2898 meta.members.insert(3, MemberState::default());
2899 meta.hard_state_map.insert(
2900 INDEX_CHANNEL_ID,
2901 HardState {
2902 voted_for: INVALID_NODE_ID,
2903 current_term: 10,
2904 },
2905 );
2906
2907 let mut mem_store = MemStorage::new(1);
2908 mem_store.append_entries(1, vec![]);
2909 mem_store.append_entries(2, vec![]);
2910 mem_store.append_entries(3, vec![]);
2911 mem_store
2912 .extend_indexes(vec![new_index_with_id(1, 1, 1, 2), new_index_with_id(1, 2, 3, 4)]);
2913 let mut s = init_sdcons_with_mem_store(&meta, mem_store);
2914 assert_eq!(s.get_index_term(), 10);
2915 assert_eq!(s.get_index_channel().voted_for, INVALID_NODE_ID);
2916
2917 let mut msg = Message {
2919 from: 2,
2920 to: 1,
2921 index_term: 1,
2922 channel_id: INDEX_CHANNEL_ID,
2923 channel_term: 10,
2924 detail: MsgDetail::Vote(VoteMsg {
2925 last_index_id: 0,
2926 last_index_term: 0,
2927 }),
2928 };
2929 s.step(msg.clone());
2930 assert_eq!(s.get_index_channel().voted_for, INVALID_NODE_ID);
2931
2932 msg.index_term = 10;
2934 s.step(msg.clone());
2935 assert_eq!(s.get_index_channel().voted_for, INVALID_NODE_ID);
2936
2937 msg.detail = MsgDetail::Vote(VoteMsg {
2939 last_index_id: 10,
2940 last_index_term: 10,
2941 });
2942 s.step(msg.clone());
2943 assert_eq!(s.get_index_channel().voted_for, 2);
2944
2945 msg.from = 3;
2947 msg.detail = MsgDetail::Vote(VoteMsg {
2948 last_index_id: 11,
2949 last_index_term: 11,
2950 });
2951 s.step(msg.clone());
2952 assert_eq!(s.get_index_channel().voted_for, 2);
2953 }
2954
2955 #[test]
2956 fn handle_prepare_req() {
2957 let mut meta = LogMeta::default();
2958 meta.members.insert(1, MemberState::default());
2959 meta.members.insert(2, MemberState::default());
2960 meta.members.insert(3, MemberState::default());
2961 meta.hard_state_map.insert(
2962 1,
2963 HardState {
2964 voted_for: INVALID_NODE_ID,
2965 current_term: 10,
2966 },
2967 );
2968
2969 let mut mem_store = MemStorage::new(1);
2970 mem_store.append_entries(1, vec![]);
2971 mem_store.append_entries(2, vec![]);
2972 mem_store.append_entries(3, vec![]);
2973 mem_store
2974 .extend_indexes(vec![new_index_with_id(1, 1, 1, 2), new_index_with_id(1, 2, 3, 4)]);
2975 let mut s = init_sdcons_with_mem_store(&meta, mem_store);
2976 assert_eq!(s.channels.get(&1).unwrap().term, 10);
2977 assert_eq!(s.channels.get(&1).unwrap().voted_for, INVALID_NODE_ID);
2978
2979 let mut msg = Message {
2981 from: 2,
2982 to: 1,
2983 index_term: 1,
2984 channel_id: 1,
2985 channel_term: 10,
2986 detail: MsgDetail::Prepare(PrepareMsg {
2987 learn: false,
2988 committed_entry_id: 0,
2989 }),
2990 };
2991 s.step(msg.clone());
2992 assert_eq!(s.channels.get(&1).unwrap().voted_for, 2);
2993
2994 s.step(msg.clone());
2996 assert_eq!(s.channels.get(&1).unwrap().voted_for, 2);
2997
2998 msg.from = 3;
3000 s.step(msg.clone());
3001 assert_eq!(s.channels.get(&1).unwrap().voted_for, 2);
3002 }
3003
3004 #[test]
3005 fn to_follower_when_receive_high_term() {
3006 let channels = vec![INDEX_CHANNEL_ID, 1];
3007 let roles = vec![Role::Follower, Role::Leader, Role::Student, Role::Candidate];
3008 for channel_id in channels {
3009 for role in &roles {
3010 debug!("execute role {} channel id {}", role, channel_id);
3011
3012 let mut meta = LogMeta::default();
3013 meta.members.insert(1, MemberState::default());
3014 meta.members.insert(2, MemberState::default());
3015 meta.members.insert(3, MemberState::default());
3016 let mut s = init_sdcons(&meta);
3017 match role {
3018 Role::Follower => {}
3019 Role::Student | Role::Leader => {
3020 wait_timeout(&mut s);
3021 let mut ready = s.advance(&meta);
3022 accept_election(&mut ready, &mut s);
3023 }
3024 Role::Candidate => {
3025 wait_timeout(&mut s);
3026 }
3027 }
3028
3029 let index_term = if channel_id == INDEX_CHANNEL_ID {
3030 10
3031 } else {
3032 1
3033 };
3034
3035 let msg = Message {
3036 from: 2,
3037 to: 1,
3038 index_term,
3039 channel_id: channel_id,
3040 channel_term: 10,
3041 detail: MsgDetail::None,
3042 };
3043 s.step(msg.clone());
3044
3045 assert_eq!(s.is_channel_follower(channel_id), true);
3046 assert_eq!(s.get_channel_term(channel_id), 10);
3047 }
3048 }
3049 }
3050
3051 #[test]
3052 fn to_follower_when_someone_declare_leader() {
3053 let mut meta = LogMeta::default();
3054 meta.members.insert(1, MemberState::default());
3055 meta.members.insert(2, MemberState::default());
3056 meta.members.insert(3, MemberState::default());
3057 let mut s = init_sdcons(&meta);
3058 wait_timeout(&mut s);
3059 let channel_term = s.get_channel_term(INDEX_CHANNEL_ID);
3060 let msg = Message {
3061 from: 2,
3062 to: 1,
3063 index_term: channel_term,
3064 channel_id: INDEX_CHANNEL_ID,
3065 channel_term: channel_term,
3066 detail: MsgDetail::Declare(DeclareMsg { committed_id: 0 }),
3067 };
3068 s.step(msg.clone());
3069
3070 assert_eq!(s.is_channel_follower(INDEX_CHANNEL_ID), true);
3071 assert_eq!(s.get_channel_term(INDEX_CHANNEL_ID), channel_term);
3072 }
3073
3074 fn init_log_meta_with_members(members: &Vec<u64>) -> LogMeta {
3075 let mut meta = LogMeta::default();
3076 for id in members {
3077 meta.members.insert(*id, MemberState::default());
3078 }
3079 meta
3080 }
3081
3082 fn init_log_meta_with_default_members() -> LogMeta {
3083 init_log_meta_with_members(&vec![1, 2, 3])
3084 }
3085
3086 #[test]
3087 fn bcast_indexes() {
3088 let meta = init_log_meta_with_default_members();
3089 let mut s = init_sdcons(&meta);
3090 wait_timeout(&mut s);
3091 let mut ready = s.advance(&meta);
3092 accept_election(&mut ready, &mut s);
3093 let mut ready = s.advance(&meta);
3094 accept_prepare(&mut ready, &mut s, HashMap::new());
3095 assert_eq!(s.is_index_leader(), true);
3096 let ready = s.advance(&meta);
3097 assert_eq!(ready.unstable_indexes.len() > 0, true);
3098 }
3099
3100 fn submit_tasks(s: &mut Sdcons, requests: &Vec<u64>) {
3101 for req_id in requests {
3102 s.submit_task(
3103 *req_id + FIRST_USER_REQUEST,
3104 Task {
3105 message: vec![0, 1],
3106 context: None,
3107 },
3108 )
3109 .expect("submit task");
3110 }
3111 }
3112
3113 fn stable_all_entries(ready: &Ready, s: &mut Sdcons) {
3114 let mut map = HashMap::new();
3115 for entry in &ready.unstable_entries {
3116 let v = map.entry(entry.channel_id).or_default();
3117 *v = entry.entry_id;
3118 }
3119 for (channel_id, v) in map {
3120 s.submit_stable_result(channel_id, 0, v);
3121 }
3122 }
3123
3124 fn stable_indexes(ready: &Ready, s: &mut Sdcons) {
3125 if ready.unstable_indexes.is_empty() {
3126 return;
3127 }
3128 let first_id = ready
3129 .unstable_indexes
3130 .first()
3131 .map(|i| i.index_id)
3132 .unwrap_or(INVALID_ID);
3133 let last_id = ready
3134 .unstable_indexes
3135 .last()
3136 .map(|i| i.index_id)
3137 .unwrap_or(INVALID_ID);
3138 s.submit_stable_result(INDEX_CHANNEL_ID, first_id, last_id);
3139 }
3140
3141 fn apply_entries(ready: &Ready, s: &mut Sdcons) {
3142 s.submit_apply_result(ready.first_apply_index_id, ready.last_apply_index_id);
3143 }
3144
3145 fn assert_chosen_entries(ready: &Ready, requests: &Vec<u64>) {
3146 let chosen_requests = ready
3147 .chosen_entries
3148 .iter()
3149 .map(|e| e.request_id)
3150 .collect::<Vec<_>>();
3151 debug!("chosen entries {:?}", chosen_requests);
3152 for request_id in requests {
3153 let count = chosen_requests
3154 .iter()
3155 .filter(|id| **id == (request_id + FIRST_USER_REQUEST))
3156 .count();
3157 assert_eq!(count, 1);
3158 }
3159 }
3160
3161 fn assert_chosen_config_change_entries(ready: &Ready) {
3162 let chosen_requests = ready
3163 .chosen_entries
3164 .iter()
3165 .map(|e| e.request_id)
3166 .collect::<Vec<_>>();
3167 debug!("chosen entries {:?}", chosen_requests);
3168
3169 let count = chosen_requests
3170 .iter()
3171 .filter(|id| **id == CONFIG_CHANGE_ID)
3172 .count();
3173 assert_eq!(count, 1);
3174 }
3175
3176 fn assert_replicate_entries(ready: &Ready, target_id: u64, requests: &Vec<u64>) {
3177 let msgs = ready
3178 .msgs
3179 .iter()
3180 .map(|(_id, msgs)| msgs.iter())
3181 .flatten()
3182 .collect::<Vec<_>>();
3183 let replicate_requests = msgs
3184 .iter()
3185 .filter_map(|m| {
3186 if m.to != target_id {
3187 None
3188 } else if let MsgDetail::Append(p) = &m.detail {
3189 Some(p.entries.clone())
3190 } else {
3191 None
3192 }
3193 })
3194 .flatten()
3195 .map(|e| e.request_id)
3196 .collect::<HashSet<_>>();
3197
3198 debug!("replicate entries {:?}", replicate_requests);
3199 for request_id in requests {
3200 let actual_request_id = request_id + FIRST_USER_REQUEST;
3201 assert_eq!(replicate_requests.contains(&actual_request_id), true);
3202 }
3203 }
3204
3205 #[test]
3206 fn single_member_choose() {
3207 let mut meta = LogMeta::default();
3208 meta.members.insert(1, MemberState::default());
3209 let mut s = init_sdcons(&meta);
3210 wait_timeout(&mut s);
3211 assert_eq!(s.is_index_leader(), true);
3212 assert_eq!(s.is_channel_leader(1), true);
3213
3214 let inputs = vec![vec![1, 2, 3], vec![4, 5, 6]];
3215 for requests in &inputs {
3216 submit_tasks(&mut s, requests);
3217 let ready = s.advance(&meta);
3218 assert_eq!(ready.chosen_entries.is_empty(), true);
3219 stable_all_entries(&ready, &mut s);
3220 stable_indexes(&ready, &mut s);
3221
3222 let ready = s.advance(&meta);
3223 assert_chosen_entries(&ready, requests);
3224 apply_entries(&ready, &mut s);
3225 }
3226 }
3227
3228 fn to_leader(s: &mut Sdcons, meta: &LogMeta) {
3229 if s.channels.len() == 2 {
3230 wait_timeout(s);
3231 assert_eq!(s.is_index_leader(), true);
3232 assert_eq!(s.get_index_channel().leader_id, 1);
3233 assert_eq!(s.is_channel_leader(1), true);
3234 } else {
3235 wait_timeout(s);
3236 let mut ready = s.advance(meta);
3237 accept_election(&mut ready, s);
3238 assert_eq!(s.is_index_student(), true);
3239 assert_eq!(s.get_index_channel().leader_id, 1);
3240 assert_eq!(s.is_local_channel_leader(), true);
3241 let mut ready = s.advance(meta);
3242 let map = HashMap::new();
3243 accept_prepare(&mut ready, s, map);
3244 assert_eq!(s.is_index_leader(), true);
3245 }
3246 }
3247
3248 #[test]
3249 fn replicate_entries() {
3250 let meta = init_log_meta_with_default_members();
3251 let mut s = init_sdcons(&meta);
3252 to_leader(&mut s, &meta);
3253 s.enter_replicate_state();
3254
3255 let entries = vec![1, 2, 3];
3256 submit_tasks(&mut s, &entries);
3257 let ready = s.advance(&meta);
3258 assert_replicate_entries(&ready, 2, &entries);
3259 assert_replicate_entries(&ready, 3, &entries);
3260
3261 let entries = vec![4, 5, 6];
3263 submit_tasks(&mut s, &entries);
3264 let ready = s.advance(&meta);
3265 assert_replicate_entries(&ready, 2, &entries);
3266 assert_replicate_entries(&ready, 3, &entries);
3267
3268 let entries = vec![7];
3270 submit_tasks(&mut s, &entries);
3271 let ready = s.advance(&meta);
3272 assert_replicate_entries(&ready, 2, &entries);
3273 assert_replicate_entries(&ready, 3, &entries);
3274
3275 let ready = s.advance(&meta);
3277 assert_eq!(ready.msgs.len(), 0);
3278
3279 let index_term = s.get_index_term();
3281 let channel_term = s.get_channel_term(1);
3282 let msg = Message {
3283 from: 2,
3284 to: 1,
3285 index_term: index_term,
3286 channel_id: 1,
3287 channel_term: channel_term,
3288 detail: MsgDetail::AppendReply(AppendReplyMsg {
3289 reject: true,
3290 entry_id: 1,
3291 hint_id: 1,
3292 }),
3293 };
3294 s.step(msg.clone());
3295 let ready = s.advance(&meta);
3296 assert_replicate_entries(&ready, 2, &vec![1, 2, 3, 4, 5, 6, 7]);
3297 }
3298
3299 fn declare_leader(s: &mut Sdcons, index_term: u64, channel_id: u64, channel_term: u64) {
3300 let msg = Message {
3301 from: 2,
3302 to: 1,
3303 index_term,
3304 channel_id,
3305 channel_term,
3306 detail: MsgDetail::Declare(DeclareMsg {
3307 committed_id: INVALID_ID,
3308 }),
3309 };
3310 s.step(msg);
3311 }
3312
3313 fn declare_with_id(
3314 s: &mut Sdcons,
3315 from: u64,
3316 index_term: u64,
3317 channel_id: u64,
3318 channel_term: u64,
3319 committed_id: u64,
3320 ) {
3321 let msg = Message {
3322 from,
3323 to: s.id,
3324 index_term,
3325 channel_id,
3326 channel_term,
3327 detail: MsgDetail::Declare(DeclareMsg { committed_id }),
3328 };
3329 s.step(msg);
3330 }
3331
3332 fn assert_append_reply(ready: &Ready, to: u64, entry_id: u64, reject: bool) {
3333 let msgs = ready
3334 .msgs
3335 .iter()
3336 .map(|(_id, msgs)| msgs.iter())
3337 .flatten()
3338 .collect::<Vec<_>>();
3339 let replies = msgs
3340 .iter()
3341 .filter_map(|m| {
3342 if m.to != to {
3343 None
3344 } else if let MsgDetail::AppendReply(p) = &m.detail {
3345 Some(p.clone())
3346 } else {
3347 None
3348 }
3349 })
3350 .collect::<Vec<_>>();
3351
3352 assert_eq!(replies.len(), 1);
3353 let reply = &replies[0];
3354 assert_eq!(reply.reject, reject);
3355 assert_eq!(reply.hint_id, entry_id);
3356 }
3357
3358 fn append_specified_entries(
3359 s: &mut Sdcons,
3360 channel_id: u64,
3361 channel_term: u64,
3362 prev_id: u64,
3363 prev_term: u64,
3364 entries: Vec<Entry>,
3365 ) {
3366 append_specified_entries_with_id(
3367 s,
3368 channel_id,
3369 channel_term,
3370 prev_id,
3371 prev_term,
3372 0,
3373 entries,
3374 );
3375 }
3376
3377 fn append_specified_entries_with_id(
3378 s: &mut Sdcons,
3379 channel_id: u64,
3380 channel_term: u64,
3381 prev_id: u64,
3382 prev_term: u64,
3383 committed_entry_id: u64,
3384 entries: Vec<Entry>,
3385 ) {
3386 let msg = Message {
3387 from: 2,
3388 to: 1,
3389 index_term: s.get_index_term(),
3390 channel_id,
3391 channel_term,
3392 detail: MsgDetail::Append(AppendMsg {
3393 committed_entry_id,
3394 prev_entry_id: prev_id,
3395 prev_entry_term: prev_term,
3396 entries,
3397 }),
3398 };
3399 s.step(msg);
3400 }
3401
3402 fn append_entries(
3403 s: &mut Sdcons,
3404 channel_id: u64,
3405 channel_term: u64,
3406 prev_id: u64,
3407 prev_term: u64,
3408 requests: &Vec<u64>,
3409 ) {
3410 append_entries_with_id(s, channel_id, channel_term, prev_id, prev_term, 0, requests);
3411 }
3412
3413 fn append_entries_with_id(
3414 s: &mut Sdcons,
3415 channel_id: u64,
3416 channel_term: u64,
3417 prev_id: u64,
3418 prev_term: u64,
3419 committed_entry_id: u64,
3420 requests: &Vec<u64>,
3421 ) {
3422 let mut entries = Vec::new();
3423 let mut id = prev_id + 1;
3424 for request_id in requests {
3425 entries.push(Entry {
3426 request_id: *request_id + FIRST_USER_REQUEST,
3427 channel_id,
3428 channel_term,
3429 entry_id: id,
3430 index_id: INVALID_ID,
3431 message: vec![1, 2, 3],
3432 context: None,
3433 configs: None,
3434 });
3435 id += 1;
3436 }
3437
3438 debug!("append entries {:?} with term {}", requests, channel_term);
3439 append_specified_entries_with_id(
3440 s,
3441 channel_id,
3442 channel_term,
3443 prev_id,
3444 prev_term,
3445 committed_entry_id,
3446 entries,
3447 );
3448 }
3449
3450 #[test]
3451 fn receive_entries() {
3452 let meta = init_log_meta_with_default_members();
3453 let mut s = init_sdcons(&meta);
3454 let index_term = 10;
3455 let channel_id = 2;
3456 let channel_term = 11;
3457 declare_leader(&mut s, index_term, channel_id, channel_term);
3458
3459 append_entries(&mut s, channel_id, channel_term, 10, 2, &vec![1, 2, 3]);
3461 let ready = s.advance(&meta);
3462 assert_append_reply(&ready, channel_id, 1, true);
3463
3464 append_entries(&mut s, channel_id, channel_term, 0, 0, &vec![1, 2, 3]);
3465 let ready = s.advance(&meta);
3467 assert_append_reply(&ready, channel_id, 4, false);
3468
3469 append_entries(&mut s, channel_id, channel_term + 1, 2, channel_term, &vec![3, 4, 5]);
3471 let ready = s.advance(&meta);
3473 assert_append_reply(&ready, channel_id, 6, false);
3474
3475 append_entries(&mut s, channel_id, channel_term + 2, 5, channel_term, &vec![5, 6, 7]);
3477 let ready = s.advance(&meta);
3478 assert_append_reply(&ready, channel_id, 3, true);
3479
3480 append_entries_with_id(
3482 &mut s,
3483 channel_id,
3484 channel_term + 2,
3485 5,
3486 channel_term + 1,
3487 5,
3488 &vec![6, 7, 8],
3489 );
3490 let ready = s.advance(&meta);
3491 assert_append_reply(&ready, channel_id, 9, false);
3492
3493 append_entries(&mut s, channel_id, channel_term + 2, 3, channel_term, &vec![4, 5]);
3494 let ready = s.advance(&meta);
3495 assert_append_reply(&ready, channel_id, 6, false);
3496 }
3497
3498 fn accept_append_entries(ready: &Ready, s: &mut Sdcons, skip_nodes: &HashSet<u64>) {
3499 let msgs = ready
3500 .msgs
3501 .iter()
3502 .map(|(_id, msgs)| msgs.iter())
3503 .flatten()
3504 .collect::<Vec<_>>();
3505 for msg in msgs {
3506 if let MsgDetail::Append(p) = &msg.detail {
3507 match p.entries.last() {
3508 Some(e) => {
3509 if skip_nodes.contains(&msg.to) {
3510 debug!("skip append entries to node {}", msg.to);
3511 continue;
3512 }
3513 let reply = Message {
3514 from: msg.to,
3515 to: msg.from,
3516 index_term: msg.index_term,
3517 channel_id: msg.channel_id,
3518 channel_term: msg.channel_term,
3519 detail: MsgDetail::AppendReply(AppendReplyMsg {
3520 entry_id: p.prev_entry_id + 1u64,
3521 hint_id: e.entry_id + 1u64,
3522 reject: false,
3523 }),
3524 };
3525 s.step(reply);
3526 }
3527 _ => {}
3528 }
3529 }
3530 }
3531 }
3532
3533 fn accept_indexes(ready: &Ready, s: &mut Sdcons, skip_nodes: &HashSet<u64>) {
3534 let msgs = ready
3535 .msgs
3536 .iter()
3537 .map(|(_id, msgs)| msgs.iter())
3538 .flatten()
3539 .collect::<Vec<_>>();
3540 for msg in &msgs {
3541 if let MsgDetail::Index(p) = &msg.detail {
3542 match p.indexes.last() {
3543 Some(i) => {
3544 if skip_nodes.contains(&msg.to) {
3545 debug!("skip append entries to node {}", msg.to);
3546 continue;
3547 }
3548 let reply = Message {
3549 from: msg.to,
3550 to: msg.from,
3551 index_term: msg.index_term,
3552 channel_id: msg.channel_id,
3553 channel_term: msg.channel_term,
3554 detail: MsgDetail::IndexReply(IndexReplyMsg {
3555 index_id: p.prev_index_id,
3556 hint_id: i.index_id + 1u64,
3557 reject: false,
3558 }),
3559 };
3560 s.step(reply);
3561 }
3562 _ => {}
3563 }
3564 }
3565 }
3566 }
3567
3568 #[test]
3569 fn multi_member_chosen() {
3570 let meta = init_log_meta_with_default_members();
3571 let mut s = init_sdcons(&meta);
3572 to_leader(&mut s, &meta);
3573 s.enter_replicate_state();
3574
3575 let requests = vec![1];
3576 submit_tasks(&mut s, &requests);
3577
3578 let mut skip_nodes = HashSet::new();
3579
3580 let ready = s.advance(&meta);
3581 assert_eq!(ready.chosen_entries.len(), 0);
3582 accept_append_entries(&ready, &mut s, &skip_nodes);
3583 accept_indexes(&ready, &mut s, &skip_nodes);
3584
3585 let ready = s.advance(&meta);
3587 assert_eq!(ready.chosen_entries.len(), 0);
3588
3589 stable_all_entries(&ready, &mut s);
3591 let ready = s.advance(&meta);
3592 assert_eq!(ready.chosen_entries.len(), 0);
3593
3594 stable_indexes(&ready, &mut s);
3595 let ready = s.advance(&meta);
3596 assert_chosen_entries(&ready, &requests);
3597 s.submit_apply_result(ready.first_apply_index_id, ready.last_apply_index_id);
3598
3599 let ready = s.advance(&meta);
3600 assert_eq!(ready.chosen_entries.len(), 0);
3601
3602 skip_nodes.insert(2);
3604 let requests = vec![2, 3, 4];
3605 submit_tasks(&mut s, &requests);
3606
3607 let ready = s.advance(&meta);
3608 accept_append_entries(&ready, &mut s, &skip_nodes);
3609 accept_indexes(&ready, &mut s, &skip_nodes);
3610
3611 let ready = s.advance(&meta);
3613 assert_eq!(ready.chosen_entries.len(), 0);
3614
3615 stable_all_entries(&ready, &mut s);
3616 stable_indexes(&ready, &mut s);
3617
3618 let ready = s.advance(&meta);
3619 assert_chosen_entries(&ready, &requests);
3620 s.submit_apply_result(ready.first_apply_index_id, ready.last_apply_index_id);
3621
3622 let ready = s.advance(&meta);
3623 assert_eq!(ready.chosen_entries.len(), 0);
3624 }
3625
3626 #[test]
3627 fn multi_member_receive_chosen() {
3628 let meta = init_log_meta_with_default_members();
3629 let mut s = init_sdcons(&meta);
3630
3631 let requests = vec![1, 2, 3];
3632 let indexes = vec![
3633 new_index_with_id(2, 1, 1, 2),
3634 new_index_with_id(2, 2, 2, 2),
3635 new_index_with_id(2, 3, 3, 2),
3636 ];
3637 append_entries(&mut s, 2, 2, INVALID_ID, INITIAL_TERM, &requests);
3638 replicate_indexes(2, INVALID_ID, INITIAL_TERM, &indexes, &mut s);
3639 declare_with_id(&mut s, 2, 2, 2, 2, 3); declare_with_id(&mut s, 2, 2, INDEX_CHANNEL_ID, 2, 3); let ready = s.advance(&meta);
3643 assert_eq!(ready.chosen_entries.len(), 0);
3644 stable_all_entries(&ready, &mut s);
3645 stable_indexes(&ready, &mut s);
3646
3647 let ready = s.advance(&meta);
3648 assert_chosen_entries(&ready, &requests);
3649 }
3650
3651 fn replicate_indexes(
3652 from: u64,
3653 prev_id: u64,
3654 prev_term: u64,
3655 indexes: &Vec<LogIndex>,
3656 s: &mut Sdcons,
3657 ) {
3658 let index_term = s.get_index_term();
3659 let msg = Message {
3660 from,
3661 to: s.id,
3662 index_term,
3663 channel_id: INDEX_CHANNEL_ID,
3664 channel_term: index_term,
3665 detail: MsgDetail::Index(IndexMsg {
3666 committed_index_id: 0,
3667 prev_index_id: prev_id,
3668 prev_index_term: prev_term,
3669 indexes: indexes.clone(),
3670 }),
3671 };
3672 s.step(msg);
3673 }
3674
3675 #[test]
3676 fn only_assign_index_once() {
3677 let meta = init_log_meta_with_members(&vec![1, 2, 3, 4, 5]);
3684 let mut s = init_sdcons(&meta);
3685
3686 let indexes = vec![new_index_with_id(2, 1, 1, 1)];
3688 replicate_indexes(2, INVALID_ID, INITIAL_TERM, &indexes, &mut s);
3689
3690 to_leader(&mut s, &meta);
3692
3693 append_entries(&mut s, 2, 2, INVALID_ID, INITIAL_TERM, &vec![1]);
3695
3696 let ready = s.advance(&meta);
3698 let index_term = s.get_index_term();
3699 stable_all_entries(&ready, &mut s);
3700 stable_indexes(&ready, &mut s);
3701 accept_append_entries(&ready, &mut s, &HashSet::new());
3702 accept_indexes(&ready, &mut s, &HashSet::new());
3703 declare_with_id(&mut s, 2, index_term, 2, 2, 1); assert_eq!(ready.chosen_entries.len(), 0);
3705
3706 debug!("validate advance indexes: {:?}", s.log_buffer.indexes);
3707 let ready = s.advance(&meta);
3708 assert_chosen_entries(&ready, &vec![1]);
3709 }
3710
3711 #[test]
3712 fn enter_both_stage() {
3713 let meta = init_log_meta_with_members(&vec![1]);
3714 let mut s = init_sdcons(&meta);
3715 to_leader(&mut s, &meta);
3716
3717 assert_eq!(s.pending_configs.is_none(), true);
3718
3719 s.change_config(vec![1, 2, 3]).expect("success");
3720 assert_eq!(s.pending_configs.is_some(), true);
3721 assert_eq!(s.pending_configs.as_ref().unwrap().stage, ConfigStage::Old);
3722
3723 s.tick();
3724 s.advance(&meta);
3725 assert_eq!(s.pending_configs.is_some(), true);
3726 assert_eq!(s.pending_configs.as_ref().unwrap().stage, ConfigStage::Both);
3727 }
3728
3729 #[test]
3730 fn committed_should_reach_majority_in_both_stage() {
3731 let meta = init_log_meta_with_members(&vec![1, 2, 3]);
3732 let mut s = init_sdcons(&meta);
3733 to_leader(&mut s, &meta);
3734
3735 assert_eq!(s.pending_configs.is_none(), true);
3736
3737 s.change_config(vec![1, 4, 5]).expect("success");
3738 assert_eq!(s.pending_configs.is_some(), true);
3739 assert_eq!(s.pending_configs.as_ref().unwrap().stage, ConfigStage::Old);
3740
3741 s.tick();
3742 assert_eq!(s.is_channel_candidate(2), true);
3743 assert_eq!(s.is_channel_candidate(3), true);
3744 assert_eq!(s.pending_configs.is_some(), true);
3745 assert_eq!(s.pending_configs.as_ref().unwrap().stage, ConfigStage::Old);
3746
3747 let mut ready = s.advance(&meta);
3748 accept_prepare(&mut ready, &mut s, HashMap::new());
3749 accept_append_entries(&ready, &mut s, &HashSet::new());
3750 accept_indexes(&ready, &mut s, &HashSet::new());
3751 assert_eq!(s.is_channel_leader(2), true);
3752 assert_eq!(s.is_channel_leader(3), true);
3753
3754 s.tick();
3755 assert_eq!(s.pending_configs.is_some(), true);
3756 assert_eq!(s.pending_configs.as_ref().unwrap().stage, ConfigStage::Both);
3757
3758 s.enter_replicate_state();
3759
3760 let mut skip_nodes = HashSet::new();
3762 skip_nodes.insert(4);
3763 skip_nodes.insert(5);
3764 let ready = s.advance(&meta);
3765 let apply_id = ready.last_apply_index_id;
3766 apply_entries(&ready, &mut s);
3767 stable_all_entries(&ready, &mut s);
3768 stable_indexes(&ready, &mut s);
3769 accept_append_entries(&ready, &mut s, &skip_nodes);
3770 accept_indexes(&ready, &mut s, &skip_nodes);
3771 let ready = s.advance(&meta);
3772 assert_eq!(ready.first_apply_index_id, apply_id + 1);
3773 assert_eq!(ready.chosen_entries.len(), 0);
3774
3775 s.enter_replicate_state();
3776
3777 debug!("both stage recieve marjority response");
3778 s.reset_member_next_id(vec![4, 5]);
3779
3780 let ready = s.advance(&meta);
3781 accept_append_entries(&ready, &mut s, &HashSet::new());
3782 accept_indexes(&ready, &mut s, &HashSet::new());
3783 let ready = s.advance(&meta);
3784 assert_chosen_config_change_entries(&ready);
3785 }
3786
3787 fn to_both_stage(s: &mut Sdcons, log_meta: &LogMeta, new_configs: Vec<u64>) {
3788 to_leader(s, log_meta);
3789 s.change_config(new_configs).expect("success");
3790 s.tick();
3791
3792 let mut ready = s.advance(log_meta);
3794 accept_prepare(&mut ready, s, HashMap::new());
3795 accept_append_entries(&ready, s, &HashSet::new());
3796 accept_indexes(&ready, s, &HashSet::new());
3797
3798 s.enter_replicate_state();
3800 s.tick();
3801 }
3802
3803 fn advance_all(s: &mut Sdcons, log_meta: &LogMeta, skip_nodes: &HashSet<u64>) {
3804 let ready = s.advance(log_meta);
3805 apply_entries(&ready, s);
3806 stable_all_entries(&ready, s);
3807 stable_indexes(&ready, s);
3808 accept_append_entries(&ready, s, skip_nodes);
3809 accept_indexes(&ready, s, skip_nodes);
3810 }
3811
3812 fn advance_all_and_count_msg<F>(
3813 s: &mut Sdcons,
3814 log_meta: &LogMeta,
3815 skip_nodes: &HashSet<u64>,
3816 is_expect_msg_fn: F,
3817 ) -> usize
3818 where
3819 F: Fn(&Message) -> bool,
3820 {
3821 let ready = s.advance(log_meta);
3822 apply_entries(&ready, s);
3823 stable_all_entries(&ready, s);
3824 stable_indexes(&ready, s);
3825 accept_append_entries(&ready, s, skip_nodes);
3826 accept_indexes(&ready, s, skip_nodes);
3827
3828 ready
3829 .msgs
3830 .iter()
3831 .map(|(_id, msgs)| msgs.iter())
3832 .flatten()
3833 .filter(|msg| is_expect_msg_fn(msg))
3834 .count()
3835 }
3836
3837 fn to_new_stage(s: &mut Sdcons, log_meta: &LogMeta, new_configs: Vec<u64>) {
3838 to_both_stage(s, log_meta, new_configs);
3839 s.enter_replicate_state();
3840 advance_all(s, log_meta, &HashSet::new());
3842 advance_all(s, log_meta, &HashSet::new());
3844
3845 s.tick();
3847 assert_eq!(s.is_pending_configs_in_new_stage(), true);
3848 }
3849
3850 #[test]
3851 fn apply_config_change() {
3852 let meta = init_log_meta_with_members(&vec![1, 2, 3]);
3853 let mut s = init_sdcons(&meta);
3854 to_new_stage(&mut s, &meta, vec![1, 4, 5]);
3855
3856 debug!("should apply config changes");
3857
3858 submit_tasks(&mut s, &vec![1, 2, 3]);
3859 let ready = s.advance(&meta);
3860 apply_entries(&ready, &mut s);
3861 stable_all_entries(&ready, &mut s);
3862 stable_indexes(&ready, &mut s);
3863
3864 let skip_nodes: HashSet<u64> = vec![1, 3].iter().cloned().collect();
3866 accept_append_entries(&ready, &mut s, &skip_nodes);
3867 accept_indexes(&ready, &mut s, &skip_nodes);
3868
3869 s.tick();
3871 let ready = s.advance(&meta);
3872 assert_chosen_entries(&ready, &vec![1, 2, 3]);
3873 let _ = s.advance(&meta);
3874 assert_eq!(s.pending_configs.is_none(), true);
3875 }
3876
3877 #[test]
3878 fn rollback_config_change() {
3879 let meta = init_log_meta_with_members(&vec![1, 2, 3]);
3880 let mut s = init_sdcons(&meta);
3881 let next_id = INVALID_ID + 1u64;
3882 let mut channel_id = 2;
3883 let mut entry = Entry {
3884 request_id: CONFIG_CHANGE_ID,
3885 channel_id: channel_id,
3886 channel_term: 1,
3887 entry_id: next_id,
3888 index_id: INVALID_ID,
3889 message: vec![],
3890 context: None,
3891 configs: Some(ChangeConfig {
3892 index_id: next_id,
3893 entry_id: next_id,
3894 term: 1,
3895 stage: ConfigStage::Both,
3896 members: HashSet::from_iter(vec![1, 4, 5].into_iter()),
3897 }),
3898 };
3899 let mut index = LogIndex {
3900 channel_id: channel_id,
3901 entry_id: next_id,
3902 index_id: next_id,
3903 term: 1,
3904 context: None,
3905 };
3906
3907 assert_eq!(s.pending_configs.is_none(), true);
3908 replicate_indexes(channel_id, INVALID_ID, INITIAL_TERM, &vec![index.clone()], &mut s);
3909 append_specified_entries(
3910 &mut s,
3911 channel_id,
3912 1,
3913 INVALID_ID,
3914 INITIAL_TERM,
3915 vec![entry.clone()],
3916 );
3917 advance_all(&mut s, &meta, &HashSet::new());
3918
3919 assert_eq!(s.pending_configs.is_some(), true);
3920 let pending_configs = s.pending_configs.as_ref().unwrap();
3921 assert_eq!(pending_configs.stage, ConfigStage::Both);
3922
3923 channel_id += 1;
3925 index.channel_id = channel_id;
3926 index.term = 2;
3927 entry.channel_id = channel_id;
3928 entry.channel_term = 2;
3929 entry.configs = Some(ChangeConfig {
3930 index_id: next_id,
3931 entry_id: next_id,
3932 term: 2,
3933 stage: ConfigStage::New,
3934 members: HashSet::from_iter(vec![1, 4, 5].into_iter()),
3935 });
3936 replicate_indexes(channel_id, INVALID_ID, INITIAL_TERM, &vec![index], &mut s);
3937 append_specified_entries(&mut s, channel_id, 1, INVALID_ID, INITIAL_TERM, vec![entry]);
3938 advance_all(&mut s, &meta, &HashSet::new());
3939
3940 assert_eq!(s.pending_configs.is_some(), true);
3941 let pending_configs = s.pending_configs.as_ref().unwrap();
3942 assert_eq!(pending_configs.stage, ConfigStage::New);
3943 assert_eq!(pending_configs.configs.contains(&2), false);
3944 }
3945
3946 #[test]
3947 fn recovery_both_stage_config_change() {
3948 let mut log_meta = LogMeta::default();
3949 let mut desc = MemberState {
3950 stage: ConfigStage::Both,
3951 applied: true,
3952 };
3953 log_meta.members.insert(1, desc.clone());
3954 desc.stage = ConfigStage::Old;
3955 log_meta.members.insert(2, desc.clone());
3956 log_meta.members.insert(3, desc.clone());
3957
3958 desc.stage = ConfigStage::New;
3959 desc.applied = false;
3960 log_meta.members.insert(4, desc.clone());
3961 log_meta.members.insert(5, desc.clone());
3962
3963 let local_id = 1;
3964 let mut mem_store = MemStorage::new(local_id);
3965 for (id, _) in &log_meta.members {
3966 mem_store.append_entries(*id, vec![]);
3967 }
3968
3969 let next_id = INVALID_ID + 1u64;
3970 let channel_id = 2;
3971 let entry = Entry {
3972 request_id: CONFIG_CHANGE_ID,
3973 channel_id: channel_id,
3974 channel_term: 1,
3975 entry_id: next_id,
3976 index_id: INVALID_ID,
3977 message: vec![],
3978 context: None,
3979 configs: Some(ChangeConfig {
3980 index_id: next_id,
3981 entry_id: next_id,
3982 term: 1,
3983 stage: ConfigStage::Both,
3984 members: HashSet::from_iter(vec![1, 4, 5].into_iter()),
3985 }),
3986 };
3987 mem_store.append_entries(channel_id, vec![entry]);
3988
3989 let index = LogIndex {
3990 channel_id: channel_id,
3991 entry_id: next_id,
3992 index_id: next_id,
3993 term: 1,
3994 context: None,
3995 };
3996 mem_store.extend_indexes(vec![index]);
3997
3998 let mut s = init_sdcons_with_mem_store(&log_meta, mem_store);
3999 assert_eq!(s.pending_configs.is_some(), true);
4000 let pending_configs = s.pending_configs.as_ref().unwrap();
4001 assert_eq!(pending_configs.stage, ConfigStage::Both);
4002 assert_eq!(pending_configs.old_configs.contains(&2), true);
4003 assert_eq!(pending_configs.old_configs.contains(&3), true);
4004 assert_eq!(pending_configs.new_configs.contains(&1), false);
4008 assert_eq!(pending_configs.old_configs.contains(&1), false);
4009 assert_eq!(pending_configs.configs.contains(&1), true);
4010
4011 s.tick();
4012 s.advance(&log_meta);
4013
4014 let channel_ids = vec![INDEX_CHANNEL_ID, 1, 2, 3, 4, 5];
4015 assert_eq!(s.channel_ids, channel_ids);
4016 }
4017
4018 #[test]
4019 fn recovery_new_stage_config_change() {
4020 let mut log_meta = LogMeta::default();
4021 let mut desc = MemberState {
4022 stage: ConfigStage::Both,
4023 applied: true,
4024 };
4025
4026 log_meta.members.insert(1, desc.clone());
4028 desc.stage = ConfigStage::Old;
4029 log_meta.members.insert(2, desc.clone());
4030 log_meta.members.insert(3, desc.clone());
4031
4032 desc.stage = ConfigStage::New;
4033 desc.applied = false;
4034 log_meta.members.insert(4, desc.clone());
4035 log_meta.members.insert(5, desc.clone());
4036
4037 let local_id = 1;
4038 let mut mem_store = MemStorage::new(local_id);
4039 for (id, _) in &log_meta.members {
4040 mem_store.append_entries(*id, vec![]);
4041 }
4042
4043 let next_id = INVALID_ID + 1u64;
4044 let channel_id = 2;
4045 let entry = Entry {
4046 request_id: CONFIG_CHANGE_ID,
4047 channel_id: channel_id,
4048 channel_term: 1,
4049 entry_id: next_id,
4050 index_id: INVALID_ID,
4051 message: vec![],
4052 context: None,
4053 configs: Some(ChangeConfig {
4054 index_id: next_id,
4055 entry_id: next_id,
4056 term: 1,
4057 stage: ConfigStage::New,
4058 members: HashSet::from_iter(vec![1, 4, 5].into_iter()),
4059 }),
4060 };
4061 mem_store.append_entries(channel_id, vec![entry]);
4062
4063 let index = LogIndex {
4064 channel_id: channel_id,
4065 entry_id: next_id,
4066 index_id: next_id,
4067 term: 1,
4068 context: None,
4069 };
4070 mem_store.extend_indexes(vec![index]);
4071
4072 let mut s = init_sdcons_with_mem_store(&log_meta, mem_store);
4073 assert_eq!(s.pending_configs.is_some(), true);
4074 let pending_configs = s.pending_configs.as_ref().unwrap();
4075 info!("pending configs: {:?}", pending_configs);
4076 assert_eq!(pending_configs.stage, ConfigStage::New);
4077 assert_eq!(pending_configs.old_configs.contains(&2), true);
4078 assert_eq!(pending_configs.old_configs.contains(&3), true);
4079 assert_eq!(pending_configs.old_configs.contains(&1), false);
4084 assert_eq!(pending_configs.configs.contains(&1), true);
4085
4086 declare_with_id(&mut s, 2, 2, 0, 2, 1);
4088 declare_with_id(&mut s, 2, 2, 2, 2, 1);
4089
4090 s.advance(&log_meta);
4091 let channel_ids = vec![INDEX_CHANNEL_ID, 1, 4, 5];
4092 assert_eq!(s.channel_ids, channel_ids);
4093 }
4094
4095 fn is_timeout_now(msg: &Message) -> bool {
4096 println!("msg: {:?}", msg);
4097 if let MsgDetail::TimeoutNow = msg.detail {
4098 true
4099 } else {
4100 false
4101 }
4102 }
4103
4104 #[test]
4105 fn transfer_leadership_directly() {
4106 let meta = init_log_meta_with_members(&vec![1, 2, 3]);
4107 let mut s = init_sdcons(&meta);
4108 let res = s.control(Control::TransferLeader(2));
4109 assert_eq!(res.is_err(), true);
4110
4111 to_leader(&mut s, &meta);
4112 advance_all(&mut s, &meta, &HashSet::new());
4113
4114 let res = s.control(Control::TransferLeader(2));
4115 assert_eq!(res.is_ok(), true);
4116
4117 let res = s.control(Control::TransferLeader(2));
4118 assert_eq!(res.is_ok(), true);
4119
4120 let count = advance_all_and_count_msg(&mut s, &meta, &HashSet::new(), is_timeout_now);
4121 assert_eq!(count > 0, true);
4122 }
4123
4124 #[test]
4125 fn transfer_leadership_after_matched() {
4126 let meta = init_log_meta_with_members(&vec![1, 2, 3]);
4127 let mut s = init_sdcons(&meta);
4128 to_leader(&mut s, &meta);
4129
4130 let requests = vec![1];
4131 submit_tasks(&mut s, &requests);
4132
4133 let res = s.control(Control::TransferLeader(2));
4134 assert_eq!(res.is_ok(), true);
4135
4136 let count = advance_all_and_count_msg(&mut s, &meta, &HashSet::new(), is_timeout_now);
4137 assert_eq!(count, 0);
4138
4139 let last_id = s.channel_next_entry_id(INDEX_CHANNEL_ID) - 1;
4140 assert_eq!(s.remote_matched_id(INDEX_CHANNEL_ID, 2), last_id);
4141
4142 let count = advance_all_and_count_msg(&mut s, &meta, &HashSet::new(), is_timeout_now);
4143 assert_eq!(count > 0, true);
4144 }
4145
4146 #[test]
4147 fn index_leader_read_index() {
4148 let meta = init_log_meta_with_members(&vec![1]);
4149 let mut mem_store = MemStorage::new(1);
4150 mem_store.append_entries(1, vec![]);
4151 mem_store.append_entries(2, vec![]);
4152 mem_store.append_entries(3, vec![]);
4153 mem_store
4154 .extend_indexes(vec![new_index_with_id(1, 1, 1, 2), new_index_with_id(1, 2, 3, 4)]);
4155 let mut s = init_sdcons_with_mem_store(&meta, mem_store);
4156
4157 to_leader(&mut s, &meta);
4158
4159 let r = s.leased_read();
4160 assert_eq!(r.is_ok(), true);
4161 let request_id = r.unwrap();
4162
4163 submit_tasks(&mut s, &vec![4, 5, 6]);
4164
4165 let ready = s.advance(&meta);
4168 apply_entries(&ready, &mut s);
4169 stable_all_entries(&ready, &mut s);
4170 stable_indexes(&ready, &mut s);
4171 assert_eq!(ready.finished_reads.contains_key(&request_id), true);
4172 assert_eq!(*ready.finished_reads.get(&request_id).unwrap(), 2);
4173
4174 s.advance(&meta);
4175
4176 let request_id = s.leased_read().unwrap();
4178 let ready = s.advance(&meta);
4179 assert_eq!(ready.finished_reads.contains_key(&request_id), true);
4180 info!("print {:?}", ready.finished_reads);
4181 assert_eq!(*ready.finished_reads.get(&request_id).unwrap() >= 6, true);
4182 }
4183
4184 #[test]
4185 fn index_leader_receive_read_index() {
4186 let meta = init_log_meta_with_members(&vec![1]);
4187 let mut mem_store = MemStorage::new(1);
4188 mem_store.append_entries(1, vec![]);
4189 mem_store.append_entries(2, vec![]);
4190 mem_store.append_entries(3, vec![]);
4191 mem_store
4192 .extend_indexes(vec![new_index_with_id(1, 1, 1, 2), new_index_with_id(1, 2, 3, 4)]);
4193 let mut s = init_sdcons_with_mem_store(&meta, mem_store);
4194
4195 to_leader(&mut s, &meta);
4196
4197 let sender = 2;
4198 let request_id = 123;
4199 let mut msg = s.build_msg_header(INDEX_CHANNEL_ID);
4200 msg.from = sender;
4201 msg.to = 1;
4202 msg.detail = MsgDetail::Read(ReadMsg { request_id });
4203 s.step(msg);
4204
4205 submit_tasks(&mut s, &vec![4, 5, 6]);
4206
4207 let ready = s.advance(&meta);
4210 apply_entries(&ready, &mut s);
4211 stable_all_entries(&ready, &mut s);
4212 stable_indexes(&ready, &mut s);
4213 assert_eq!(ready.msgs.len(), 1);
4214 info!("print {:?}", ready.msgs);
4215 let msgs = ready.msgs.get(&INDEX_CHANNEL_ID).unwrap();
4216 assert_eq!(msgs.len(), 1);
4217 if let MsgDetail::ReadReply(reply) = &msgs[0].detail {
4218 assert_eq!(reply.request_id, request_id);
4219 assert_eq!(reply.recommend_id, 2);
4220 }
4221
4222 s.advance(&meta);
4223
4224 let request_id = 123123;
4226 let mut msg = s.build_msg_header(INDEX_CHANNEL_ID);
4227 msg.from = sender;
4228 msg.to = 1;
4229 msg.detail = MsgDetail::Read(ReadMsg { request_id });
4230 s.step(msg);
4231
4232 let ready = s.advance(&meta);
4233 assert_eq!(ready.msgs.len(), 1);
4234 let msgs = ready.msgs.get(&INDEX_CHANNEL_ID).unwrap();
4235 assert_eq!(msgs.len(), 1);
4236 if let MsgDetail::ReadReply(reply) = &msgs[0].detail {
4237 assert_eq!(reply.request_id, request_id);
4238 assert_eq!(reply.recommend_id >= 6, true);
4239 }
4240 }
4241
4242 #[test]
4243 fn follower_retry_read_index_when_heartbeat() {
4244 let mut log_meta = init_log_meta_with_default_members();
4245 let mut s = init_sdcons(&log_meta);
4246
4247 to_leader(&mut s, &log_meta);
4248 let next_channel_term = s.get_index_term() + 1;
4249 declare_leader(&mut s, next_channel_term, INDEX_CHANNEL_ID, next_channel_term);
4250
4251 assert_eq!(s.is_index_leader(), false);
4252 assert_eq!(s.is_local_channel_leader(), true);
4253 s.advance(&log_meta);
4254
4255 let request_id = s.leased_read().unwrap();
4257 let ready = s.advance(&log_meta);
4258 assert_eq!(ready.msgs.len(), 1);
4259 let msgs = ready.msgs.get(&INDEX_CHANNEL_ID).unwrap();
4260 info!("{:?}", msgs);
4261 assert_eq!(msgs.len(), 1);
4262 if let MsgDetail::Read(req) = &msgs[0].detail {
4263 assert_eq!(req.request_id, request_id);
4264 }
4265
4266 s.tick();
4268 let ready = s.advance(&log_meta);
4269 let msgs = ready.msgs.get(&INDEX_CHANNEL_ID).unwrap();
4270 info!("{:?}", msgs);
4271 assert_eq!(msgs.len(), 1);
4272 if let MsgDetail::Read(req) = &msgs[0].detail {
4273 assert_eq!(req.request_id, request_id);
4274 }
4275
4276 let sender = 2;
4278 let mut msg = s.build_msg_header(INDEX_CHANNEL_ID);
4279 msg.from = sender;
4280 msg.to = 1;
4281 msg.detail = MsgDetail::ReadReply(ReadReplyMsg {
4282 request_id,
4283 recommend_id: 6,
4284 });
4285 s.step(msg);
4286 let ready = s.advance(&log_meta);
4287 assert_eq!(ready.finished_reads.contains_key(&request_id), true);
4288 info!("print {:?}", ready.finished_reads);
4289 assert_eq!(*ready.finished_reads.get(&request_id).unwrap() >= 6, true);
4290 }
4291}