Skip to main content

sdcons/
cons.rs

1// Copyright 2021 The sdcons Authors. Licensed under Apache-2.0.
2
3// Copyright 2015 The etcd Authors
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::collections::{HashMap, HashSet, VecDeque};
18
19use chrono::prelude::*;
20use log::{debug, error, info, trace, warn};
21use rand::{thread_rng, RngCore};
22
23use crate::constant::*;
24use crate::error::Error;
25use crate::progress::*;
26use crate::storage::*;
27use crate::types::*;
28
29/// The options for create a sdcons node.
30#[derive(Debug)]
31pub struct SdconsOption {
32    /// Specify the base election timeout tick of sdcons, so a sdcons node will
33    /// randomly choose a value between [`base_election_timeout_tick`,
34    /// `2 * base_election_timeout_tick`] as election timeout.  If the node
35    /// haven't received when eleciton timeout is elapsed, the node will
36    /// promotes it's term and participants the electing progress of the new
37    /// term.
38    ///
39    /// default: 3
40    pub base_election_timeout_tick: u32,
41
42    /// Specify the approximately limit of number of bytes of each write task
43    /// issued by sdcons.
44    ///
45    /// default: 64KB
46    pub max_stable_bytes: u64,
47
48    /// Specify the approximately limit of number of bytes of each apply task
49    /// issued by sdcons.
50    ///
51    /// default: 64KB
52    pub max_apply_bytes: u64,
53
54    /// Specify the approximately limit of number of bytes of each log
55    /// replication.
56    ///
57    /// default: 128KB
58    pub max_bcast_bytes: u64,
59}
60
61impl Default for SdconsOption {
62    fn default() -> SdconsOption {
63        SdconsOption {
64            base_election_timeout_tick: 3,
65            max_stable_bytes: 64 * 1024 * 1024,
66            max_apply_bytes: 64 * 1024 * 1024,
67            max_bcast_bytes: 128 * 1204 * 1024,
68        }
69    }
70}
71
72/// Randomly generate the next election timeout tick between `[base, 2 * base]`.
73#[inline(always)]
74fn generate_election_timeout_tick(base: u32) -> u32 {
75    base + thread_rng().next_u32() % base + 1
76}
77
78/// Is a message sended from the leader of the channel specified by
79/// `target_channel_id`?
80#[inline(always)]
81fn is_leader_claim_message(msg: &Message, target_channel_id: u64) -> bool {
82    match &msg.detail {
83        MsgDetail::Index(_) | MsgDetail::Declare(_) | MsgDetail::Append(_)
84            if target_channel_id == msg.channel_id =>
85        {
86            true
87        }
88        _ => false,
89    }
90}
91
92/// Read the config stage of optional PendingConfigs, if the optional value is
93/// None, return ConfigStage::New.
94#[inline(always)]
95fn unwrap_config_stage(pending_configs: &Option<PendingConfigs>) -> ConfigStage {
96    pending_configs
97        .as_ref()
98        .map(|p| p.stage)
99        .unwrap_or(ConfigStage::New)
100}
101
102/// Is both optional pending configs comes from the same configuration log.
103#[inline(always)]
104fn is_same_pending_configs(lhs: &Option<PendingConfigs>, rhs: &Option<PendingConfigs>) -> bool {
105    match (lhs, rhs) {
106        (Some(l), Some(r)) => l.eq(r),
107        _ => false,
108    }
109}
110
111#[inline(always)]
112fn active_members(descs: &HashMap<u64, MemberState>) -> Vec<u64> {
113    // FIXME(patrick) check applied
114    descs.iter().map(|(id, _d)| *id).collect::<Vec<_>>()
115}
116
117#[inline(always)]
118fn filter_index_channel(channel_ids: &Vec<u64>) -> HashSet<u64> {
119    channel_ids
120        .iter()
121        .cloned()
122        .filter(|id| *id != INDEX_CHANNEL_ID)
123        .collect::<HashSet<_>>()
124}
125
126/// A set of operations is used to control the inner state of sdcons.
127pub enum Control {
128    /// Remove the indexes and entries which has been applied in the
129    /// memory-storage and release memory.
130    ReleaseMemory,
131    /// Transfer the current leadership to specified node.
132    ///
133    /// REQUIRES no leadership transfering exists
134    TransferLeader(u64),
135    /// Make the lease of index follower expired immediately and elects the new
136    /// term's leader.
137    TimeoutNow,
138    /// Notify the state machine to generate an snapshot.
139    Checkpoint,
140}
141
142/// A helper struct contains all infos about how to create a message.
143///
144/// User should read the term of previous log (id is `from_id()-1`), the logs
145/// between range `[from_id(), until_id()]`, and provides those infos to
146/// MsgBuilder to build pending messages.  After that, user is to send this
147/// message to `remote_id()`.
148///
149/// **WARNING**:
150/// User should check the value of `channel_id()` to determine whether invoke
151/// whick build function.  If the value of is `INDEX_CHANNEL_ID`, invokes
152/// `build_index_msg()`, otherwise invokes `build_append_msg()`.
153#[derive(Debug)]
154pub struct MsgBuilder {
155    from: u64,
156    to: u64,
157    index_term: u64,
158    channel_id: u64,
159    channel_term: u64,
160    committed_id: u64,
161    first_id: u64,
162    until_id: u64,
163}
164
165impl MsgBuilder {
166    /// The node this pending message send from.
167    pub fn local_id(&self) -> u64 {
168        self.from
169    }
170
171    /// The node this pending message send to.
172    pub fn remote_id(&self) -> u64 {
173        self.to
174    }
175
176    /// The channel this message belongs to.
177    pub fn channel_id(&self) -> u64 {
178        self.channel_id
179    }
180
181    /// The first log id need to read.
182    pub fn from_id(&self) -> u64 {
183        self.first_id
184    }
185
186    /// The last log id need to read.
187    pub fn until_id(&self) -> u64 {
188        self.until_id
189    }
190
191    pub fn build_append_msg(builder: &Self, prev_term: u64, entries: Vec<Entry>) -> Message {
192        assert_ne!(builder.channel_id, INDEX_CHANNEL_ID);
193        Message {
194            from: builder.from,
195            to: builder.to,
196            index_term: builder.index_term,
197            channel_id: builder.channel_id,
198            channel_term: builder.channel_term,
199            detail: MsgDetail::Append(AppendMsg {
200                committed_entry_id: builder.committed_id,
201                prev_entry_id: builder.first_id - 1u64,
202                prev_entry_term: prev_term,
203                entries,
204            }),
205        }
206    }
207
208    pub fn build_index_msg(builder: &Self, prev_term: u64, indexes: Vec<LogIndex>) -> Message {
209        assert_eq!(builder.channel_id, INDEX_CHANNEL_ID);
210        Message {
211            from: builder.from,
212            to: builder.to,
213            index_term: builder.index_term,
214            channel_id: builder.channel_id,
215            channel_term: builder.channel_term,
216            detail: MsgDetail::Index(IndexMsg {
217                committed_index_id: builder.committed_id,
218                prev_index_id: builder.first_id - 1u64,
219                prev_index_term: prev_term,
220                indexes,
221            }),
222        }
223    }
224}
225
226#[derive(Debug)]
227struct RemoteSnapshotRecevingStates {
228    id: u64,
229    states: HashMap<u64, SnapshotRecevingState>,
230}
231
232impl RemoteSnapshotRecevingStates {
233    fn new(id: u64) -> RemoteSnapshotRecevingStates {
234        RemoteSnapshotRecevingStates {
235            id,
236            states: HashMap::new(),
237        }
238    }
239
240    fn is_any_remote_receiving_snapshot(&self) -> bool {
241        self.states.iter().map(|(_, v)| v.receiving).count() > 0
242    }
243
244    fn is_remote_receiving_snapshot(&self, remote_id: u64) -> bool {
245        self.states
246            .get(&remote_id)
247            .map(|s| s.receiving)
248            .unwrap_or(false)
249    }
250
251    fn get_mut_remote_snapshot_state(&mut self, remote_id: u64) -> &mut SnapshotRecevingState {
252        self.states
253            .entry(remote_id)
254            .or_insert(SnapshotRecevingState {
255                receiving: false,
256                start_at: 0,
257                last_heartbeat_at: 0,
258            })
259    }
260
261    fn update_remote_snapshot_state(&mut self, remote_id: u64, receiving: bool) {
262        let e = self.get_mut_remote_snapshot_state(remote_id);
263        e.receiving = receiving;
264        e.last_heartbeat_at = Local::now().timestamp_nanos();
265        e.start_at = Local::now().timestamp_nanos();
266    }
267
268    fn maybe_update_remote_snapshot_state(&mut self, remote_id: u64, is_receving: bool) {
269        let id = self.id;
270        let e = self.get_mut_remote_snapshot_state(remote_id);
271        if e.receiving {
272            e.last_heartbeat_at = Local::now().timestamp_nanos();
273            if !is_receving && e.last_heartbeat_at - e.start_at >= 1000 * 1000 * 1000 {
274                // 1s
275                info!("node {} remote {} receiving snapshot timeout", id, remote_id);
276                e.receiving = false;
277            }
278        } else if is_receving {
279            e.receiving = true;
280            e.last_heartbeat_at = Local::now().timestamp_nanos();
281            e.start_at = Local::now().timestamp_nanos();
282        }
283    }
284}
285
286/// Keeps the infos used about leadership transfering.
287#[derive(Debug)]
288struct TransferingRecord {
289    /// The target node who leadership is transfering.
290    pub to: u64,
291
292    /// The channel where the transfering leadership.
293    pub channel_id: u64,
294
295    /// The number of elapsed ticks which the transfering progress passed.
296    pub elapsed_tick: usize,
297}
298
299impl Default for TransferingRecord {
300    fn default() -> TransferingRecord {
301        TransferingRecord {
302            to: INVALID_NODE_ID,
303            channel_id: INDEX_CHANNEL_ID,
304            elapsed_tick: 0,
305        }
306    }
307}
308
309impl TransferingRecord {
310    fn new(channel_id: u64, to: u64) -> TransferingRecord {
311        TransferingRecord {
312            to,
313            channel_id,
314            elapsed_tick: 0,
315        }
316    }
317
318    /// Is there exists a leadership transfering?
319    fn doing(&self) -> bool {
320        self.to != INVALID_NODE_ID
321    }
322
323    /// Update leadership transfering elapsed tick.
324    fn elapsed(&mut self) {
325        if self.to != INVALID_NODE_ID {
326            self.elapsed_tick += 1;
327        }
328    }
329
330    /// Is this leadership transfering timeouted.
331    fn timeout(&self) -> bool {
332        // FIXME(patrick) use base_election_timeout_tick
333        self.to != INVALID_NODE_ID && self.elapsed_tick >= 3
334    }
335
336    fn abort(&mut self) {
337        self.reset(INVALID_NODE_ID, INDEX_CHANNEL_ID);
338    }
339
340    fn reset(&mut self, to: u64, channel_id: u64) {
341        self.to = to;
342        self.channel_id = channel_id;
343        self.elapsed_tick = 0;
344    }
345
346    // Try start the transfering progress, return false if there exists an
347    // transfering and they targes are differents.
348    fn initiate(&mut self, channel_id: u64, to: u64) -> bool {
349        if self.to != INVALID_NODE_ID {
350            self.channel_id == channel_id && self.to == to
351        } else {
352            self.reset(to, channel_id);
353            true
354        }
355    }
356
357    fn target_to(&self, channel_id: u64, to: u64) -> bool {
358        self.to == to && self.channel_id == channel_id
359    }
360}
361
362/// Keeps the infos used about membership changing.
363#[derive(Debug)]
364struct PendingConfigs {
365    stage: ConfigStage,
366
367    channel_id: u64,
368    term: u64,
369
370    /// The index id of both-stage entry, pending configs will enter new-stage
371    /// if this index_id is chosen.
372    index_id: u64,
373    entry_id: u64,
374
375    /// Target members proposed in configuration log.
376    configs: HashSet<u64>,
377
378    /// In new stages, but not contains in old stage.
379    new_configs: HashSet<u64>,
380    /// In old stages, but not contains in new stage.
381    old_configs: HashSet<u64>,
382}
383
384impl PartialEq for PendingConfigs {
385    fn eq(&self, other: &PendingConfigs) -> bool {
386        // Two membership changing proposal is samed, iff the has same indxe id, term
387        // and proposed by the same channel.
388        if self.index_id == INVALID_ID || other.index_id == INVALID_ID {
389            return false;
390        }
391        self.channel_id == other.channel_id
392            && self.index_id == other.index_id
393            && self.term == other.term
394    }
395}
396
397impl PendingConfigs {
398    fn new(
399        channel_id: u64,
400        index_id: u64,
401        entry_id: u64,
402        term: u64,
403        stage: ConfigStage,
404        configs: HashSet<u64>,
405        old_members: &HashSet<u64>,
406    ) -> PendingConfigs {
407        let new_configs = configs
408            .iter()
409            .filter(|v| !old_members.contains(v))
410            .cloned()
411            .collect::<HashSet<_>>();
412        let old_configs = old_members
413            .iter()
414            .filter(|id| !configs.contains(*id) && **id != INDEX_CHANNEL_ID)
415            .cloned()
416            .collect::<HashSet<_>>();
417        PendingConfigs {
418            stage,
419            channel_id,
420            term,
421            index_id,
422            entry_id,
423            configs,
424            new_configs,
425            old_configs,
426        }
427    }
428
429    fn desc(&self) -> ChangeConfig {
430        ChangeConfig {
431            index_id: self.index_id,
432            entry_id: self.entry_id,
433            term: self.term,
434            stage: self.stage,
435            members: self.configs.clone(),
436        }
437    }
438
439    fn member_stage(&self, id: u64) -> MemberState {
440        MemberState {
441            applied: false,
442            stage: if self.old_configs.contains(&id) {
443                ConfigStage::Old
444            } else if self.new_configs.contains(&id) {
445                ConfigStage::New
446            } else {
447                ConfigStage::Both
448            },
449        }
450    }
451
452    fn get_member_stages(&self) -> HashMap<u64, MemberState> {
453        self.configs
454            .iter()
455            .map(|id| (*id, self.member_stage(*id)))
456            .collect()
457    }
458}
459
460#[derive(Debug)]
461pub(crate) struct Sdcons {
462    option: SdconsOption,
463    random_election_timeout_tick: u32,
464
465    id: u64,
466    log_buffer: MemStorage,
467    channel_ids: Vec<u64>,
468    channels: HashMap<u64, ChannelInfo>,
469    pending_configs: Option<PendingConfigs>,
470
471    next_read_request_id: u64,
472    pending_reads: VecDeque<u64>,
473    undoing_reads: Vec<u64>,
474
475    choosen_id: u64,
476    pending_id: u64,
477    applied_id: u64,
478
479    transfer: TransferingRecord,
480    snapshot_state: SnapshotState,
481    snapshot_receving_states: RemoteSnapshotRecevingStates,
482
483    ready: Ready,
484}
485
486/// The struct `Ready` is mainly used to buffer the output generated by the
487/// state machine after receiving request, messages and tick.  The `advance()`
488/// function will take the value of `Ready`.
489#[derive(Debug)]
490pub(crate) struct Ready {
491    // The range of chosen entries [first, last].  If there no any chosen entries,
492    // last_apply_index_id + 1 = first_apply_index_id.
493    pub first_apply_index_id: u64,
494    pub last_apply_index_id: u64,
495    pub chosen_entries: Vec<Entry>,
496
497    pub unstable_indexes: Vec<LogIndex>,
498    pub unstable_entries: Vec<Entry>,
499
500    pub msgs: HashMap<u64, Vec<Message>>,
501    pub cop_msgs: Option<Vec<MsgBuilder>>,
502
503    /// The field `post_apply` indicates that the entries in field
504    /// `chosen_entries` needs to be submitted to the applier after the metas is
505    /// persisted.
506    ///
507    /// The field is mainly used in some special scenarios.  In these scenarios,
508    /// the user needs to ensure that the committed ids is persisted before
509    /// applying logs.
510    pub post_apply: bool,
511    pub should_checkpoint: bool,
512    pub should_stable_metas: bool,
513    pub pending_snapshot: Option<SnapshotDesc>,
514
515    pub roles: HashMap<u64, Role>,
516    pub hard_states: HashMap<u64, HardState>,
517    pub member_states: HashMap<u64, MemberState>,
518    pub committed_stats: HashMap<u64, u64>,
519
520    pub finished_reads: HashMap<u64, u64>, // request_id -> committed_id,
521}
522
523impl Default for Ready {
524    fn default() -> Self {
525        Ready {
526            first_apply_index_id: INVALID_ID,
527            last_apply_index_id: INVALID_ID,
528            should_checkpoint: false,
529            should_stable_metas: false,
530            post_apply: false,
531            msgs: HashMap::new(),
532            unstable_indexes: Vec::new(),
533            unstable_entries: Vec::new(),
534            chosen_entries: Vec::new(),
535            cop_msgs: None,
536            pending_snapshot: None,
537            roles: HashMap::new(),
538            hard_states: HashMap::new(),
539            member_states: HashMap::new(),
540            committed_stats: HashMap::new(),
541            finished_reads: HashMap::new(),
542        }
543    }
544}
545
546impl Ready {
547    fn send_msg(&mut self, msg: Message) {
548        assert_ne!(msg.to, INVALID_NODE_ID);
549        assert_ne!(msg.to, INDEX_CHANNEL_ID);
550        self.msgs
551            .entry(msg.channel_id)
552            .or_insert(Vec::new())
553            .push(msg);
554    }
555
556    fn send_cop_msg(&mut self, msg_builder: MsgBuilder) {
557        assert_ne!(msg_builder.to, INDEX_CHANNEL_ID);
558        assert_ne!(msg_builder.to, INVALID_NODE_ID);
559        if let Some(v) = &mut self.cop_msgs {
560            v.push(msg_builder);
561        } else {
562            self.cop_msgs = Some(vec![msg_builder])
563        }
564    }
565}
566
567impl Sdcons {
568    fn assign_index(&mut self, channel_id: u64, entry_id: u64) -> u64 {
569        assert_ne!(channel_id, INDEX_CHANNEL_ID);
570        let index = LogIndex {
571            channel_id,
572            entry_id,
573            index_id: 0,
574            term: self.get_channel_term(INDEX_CHANNEL_ID),
575            context: None,
576        };
577        let index_id = self.log_buffer.allocate(index);
578        debug!(
579            "node {} channel {} assign channel {} entry {} index {}",
580            self.id, INDEX_CHANNEL_ID, channel_id, entry_id, index_id
581        );
582        index_id
583    }
584
585    fn is_index_leader(&self) -> bool {
586        self.is_channel_leader(INDEX_CHANNEL_ID)
587    }
588
589    fn is_index_student(&self) -> bool {
590        self.channels
591            .get(&INDEX_CHANNEL_ID)
592            .map(|i| i.is_student())
593            .unwrap_or(false)
594    }
595
596    fn is_channel_leader(&self, channel_id: u64) -> bool {
597        self.channels
598            .get(&channel_id)
599            .map(|i| i.is_leader())
600            .unwrap_or(false)
601    }
602
603    fn is_channel_candidate(&self, channel_id: u64) -> bool {
604        self.channels
605            .get(&channel_id)
606            .map(|i| i.is_candidate())
607            .unwrap_or(false)
608    }
609
610    fn is_channel_follower(&self, channel_id: u64) -> bool {
611        self.channels
612            .get(&channel_id)
613            .map(|i| i.is_follower())
614            .unwrap_or(false)
615    }
616
617    fn is_local_channel_leader(&self) -> bool {
618        self.is_channel_leader(self.id)
619    }
620
621    fn local_entry_channel_term(&self) -> u64 {
622        self.get_channel_term(self.id)
623    }
624
625    fn next_entry_id(&self) -> u64 {
626        self.channel_next_entry_id(self.id)
627    }
628
629    fn channel_next_entry_id(&self, channel_id: u64) -> u64 {
630        self.log_buffer.channel_last_entry_id(channel_id) + 1u64
631    }
632
633    fn remote_ids(&self) -> Vec<u64> {
634        self.channel_ids
635            .iter()
636            .map(|k| *k)
637            .filter(|k| *k != self.id && *k != INDEX_CHANNEL_ID)
638            .collect::<Vec<_>>()
639    }
640
641    fn get_member_states(&self) -> HashMap<u64, MemberState> {
642        if let Some(pending_configs) = &self.pending_configs {
643            match pending_configs.stage {
644                ConfigStage::New | ConfigStage::Both => {
645                    return pending_configs.get_member_stages();
646                }
647                _ => {}
648            }
649        }
650        self.get_index_channel().get_member_states()
651    }
652
653    fn build_channel_if_not_exists(&mut self, channel_id: u64) {
654        if !self.channels.contains_key(&channel_id) {
655            self.channels.insert(
656                channel_id,
657                ChannelInfo::build(self.id, channel_id, self.get_member_states()),
658            );
659        }
660    }
661
662    fn build_msg_header(&self, channel_id: u64) -> Message {
663        let index_term = self.get_index_term();
664        let channel_term = if channel_id == INDEX_CHANNEL_ID {
665            index_term
666        } else {
667            self.get_channel_term(channel_id)
668        };
669        Message {
670            from: self.id,
671            to: INVALID_NODE_ID,
672            index_term,
673            channel_id,
674            channel_term,
675            detail: MsgDetail::None,
676        }
677    }
678
679    fn append_entry(&mut self, channel_id: u64, entry: Entry) {
680        self.log_buffer.append_entry(channel_id, entry)
681    }
682
683    fn advance_choose_progress(&mut self) {
684        // Only advance chosen index to matched id.
685        let committed_map = self
686            .channels
687            .iter()
688            .map(|(id, c)| (*id, c.get_local_safety_commit_id()))
689            .collect::<HashMap<_, _>>();
690        self.log_buffer.advance_chosen_index_id(committed_map);
691    }
692
693    fn bcast_index_channel<T>(&mut self, log_meta_view: &T)
694    where
695        T: LogMetaView,
696    {
697        let msg_header = self.build_msg_header(INDEX_CHANNEL_ID);
698        let mem_first_id = self.log_buffer.channel_first_entry_id(INDEX_CHANNEL_ID);
699        let mem_last_id = self.log_buffer.channel_last_entry_id(INDEX_CHANNEL_ID);
700        let channel_info = self.channels.get_mut(&INDEX_CHANNEL_ID).unwrap();
701        if !channel_info.is_leader() {
702            return;
703        }
704
705        let config_stage = unwrap_config_stage(&self.pending_configs);
706        let advanced = channel_info.try_advance_committed_id(config_stage);
707        let committed_index_id = channel_info.committed_id;
708        for (id, progress) in &mut channel_info.progress_map {
709            if *id == self.id
710                || progress.is_paused()
711                || self
712                    .snapshot_receving_states
713                    .is_remote_receiving_snapshot(*id)
714            {
715                continue;
716            }
717
718            if progress.next_id <= mem_first_id {
719                let (stabled_first_id, _) = log_meta_view.range_of(INDEX_CHANNEL_ID);
720                if progress.next_id < stabled_first_id {
721                    if let SnapshotState::Creating = self.snapshot_state {
722                        continue;
723                    } else if let Some(desc) = log_meta_view.latest_snapshot() {
724                        debug!(
725                            "node {} channel {} replicate snapshot to {}",
726                            self.id, INDEX_CHANNEL_ID, *id
727                        );
728                        let mut msg = msg_header.clone();
729                        msg.to = *id;
730                        msg.detail = MsgDetail::Snapshot(desc);
731                        self.ready.send_msg(msg);
732                        self.snapshot_receving_states
733                            .update_remote_snapshot_state(*id, true);
734                    } else {
735                        debug!("node {} channel {} try create snapshot", self.id, INDEX_CHANNEL_ID);
736                        self.snapshot_state = SnapshotState::Creating;
737                        self.ready.should_checkpoint = true;
738                    }
739                } else {
740                    let builder = MsgBuilder {
741                        from: self.id,
742                        to: *id,
743                        index_term: msg_header.index_term,
744                        channel_id: msg_header.channel_id,
745                        channel_term: msg_header.channel_term,
746                        committed_id: committed_index_id,
747                        first_id: progress.next_id,
748                        until_id: mem_first_id,
749                    };
750                    debug!(
751                        "node {} channel {} send build msg: {:?}",
752                        self.id, INDEX_CHANNEL_ID, builder
753                    );
754                    self.ready.send_cop_msg(builder);
755                    progress.step_building_msg();
756                }
757            } else if progress.next_id <= mem_last_id || advanced {
758                let prev_index_id = progress.next_id - 1u64;
759                debug!(
760                    "node {} channel {} replicate indexes[{}, {}) to {}, advance {}",
761                    self.id,
762                    INDEX_CHANNEL_ID,
763                    progress.next_id,
764                    mem_last_id + 1u64,
765                    *id,
766                    advanced
767                );
768                let prev_index_term = self
769                    .log_buffer
770                    .index_term(prev_index_id)
771                    .unwrap_or(INITIAL_TERM);
772                let indexes = self.log_buffer.index_range(progress.next_id, mem_last_id);
773                progress.replicate_to(mem_last_id);
774                let mut msg = msg_header.clone();
775                msg.to = *id;
776                msg.detail = MsgDetail::Index(IndexMsg {
777                    committed_index_id,
778                    prev_index_id,
779                    prev_index_term,
780                    indexes,
781                });
782                self.ready.send_msg(msg);
783            }
784        }
785    }
786
787    fn bcast_entry_channel<T>(&mut self, channel_id: u64, log_meta_view: &T)
788    where
789        T: LogMetaView,
790    {
791        let msg_header = self.build_msg_header(channel_id);
792        let first_id = self.log_buffer.channel_first_entry_id(channel_id);
793        let last_id = self.log_buffer.channel_last_entry_id(channel_id);
794        let channel_info = self.channels.get_mut(&channel_id).unwrap();
795        if !channel_info.is_leader() {
796            return;
797        }
798        let config_stage = unwrap_config_stage(&self.pending_configs);
799        let advanced = channel_info.try_advance_committed_id(config_stage);
800        let committed_entry_id = channel_info.committed_id;
801        for (id, progress) in &mut channel_info.progress_map {
802            if *id == self.id
803                || progress.is_paused()
804                || self
805                    .snapshot_receving_states
806                    .is_remote_receiving_snapshot(*id)
807            {
808                continue;
809            }
810
811            if progress.next_id <= first_id {
812                let (stabled_first_id, _) = log_meta_view.range_of(INDEX_CHANNEL_ID);
813                if progress.next_id < stabled_first_id {
814                    if let SnapshotState::Creating = self.snapshot_state {
815                        continue;
816                    } else if let Some(desc) = log_meta_view.latest_snapshot() {
817                        debug!(
818                            "node {} channel {} replicate snapshot to {}",
819                            self.id, channel_id, *id
820                        );
821                        let mut msg = msg_header.clone();
822                        msg.to = *id;
823                        msg.detail = MsgDetail::Snapshot(desc);
824                        self.ready.send_msg(msg);
825                        self.snapshot_receving_states
826                            .update_remote_snapshot_state(*id, true);
827                    } else {
828                        debug!("node {} channel {} try create snapshot", self.id, channel_id);
829                        self.snapshot_state = SnapshotState::Creating;
830                        self.ready.should_checkpoint = true;
831                    }
832                } else {
833                    let builder = MsgBuilder {
834                        from: self.id,
835                        to: *id,
836                        index_term: msg_header.index_term,
837                        channel_id: msg_header.channel_id,
838                        channel_term: msg_header.channel_term,
839                        committed_id: committed_entry_id,
840                        first_id: progress.next_id,
841                        until_id: first_id,
842                    };
843                    debug!("node {} channel {} send build msg: {:?}", self.id, channel_id, builder);
844                    self.ready.send_cop_msg(builder);
845                    progress.step_building_msg();
846                }
847            } else if progress.next_id <= last_id || advanced {
848                let prev_entry_id = progress.next_id - 1u64;
849                debug!(
850                    "node {} channel {} replicate entries[{}, {}) to {}, advance {}",
851                    self.id,
852                    channel_id,
853                    progress.next_id,
854                    last_id + 1u64,
855                    *id,
856                    advanced
857                );
858                let prev_entry_term = self
859                    .log_buffer
860                    .entry_term(channel_id, prev_entry_id)
861                    .unwrap_or(INITIAL_TERM);
862                let entries = self
863                    .log_buffer
864                    .range_of(channel_id, progress.next_id, last_id);
865                debug!("node {} channel {} entries is {:?}", self.id, channel_id, entries);
866                progress.replicate_to(last_id);
867                let mut msg = msg_header.clone();
868                msg.to = *id;
869                msg.detail = MsgDetail::Append(AppendMsg {
870                    committed_entry_id,
871                    prev_entry_id,
872                    prev_entry_term,
873                    entries,
874                });
875                self.ready.send_msg(msg);
876            }
877        }
878    }
879
880    fn extract_chosen_entries(&mut self) {
881        let (first_index_id, last_index_id, entries) = self
882            .log_buffer
883            .extract_unapply_entries(self.option.max_apply_bytes);
884        self.ready.first_apply_index_id = first_index_id;
885        self.ready.last_apply_index_id = last_index_id;
886        self.ready.chosen_entries = entries;
887    }
888
889    fn extract_unstable_entries(&mut self) {
890        self.ready.unstable_entries = self
891            .log_buffer
892            .extract_unstable_entries(self.option.max_stable_bytes);
893        self.ready.unstable_indexes = self.log_buffer.extract_unstable_indexes();
894    }
895
896    fn bcast_each_channels<L>(&mut self, log_meta_view: &L)
897    where
898        L: LogMetaView,
899    {
900        // FIXME(patrick) In the current design, different channels have different
901        // progress. If one progress is paused, the progress of other channels cannot be
902        // sensed temporarily.
903        for idx in 0..self.channel_ids.len() {
904            let channel_id = self.channel_ids[idx];
905            if channel_id == INDEX_CHANNEL_ID {
906                self.bcast_index_channel(log_meta_view);
907            } else {
908                self.bcast_entry_channel(channel_id, log_meta_view);
909            }
910        }
911    }
912
913    fn collect_state_snapshot(&mut self) {
914        self.ready.hard_states = self
915            .channels
916            .iter()
917            .map(|(i, c)| (*i, c.hard_state()))
918            .collect::<HashMap<_, _>>();
919        self.ready.roles = self
920            .channels
921            .iter()
922            .map(|(i, c)| (*i, c.role))
923            .collect::<HashMap<_, _>>();
924        self.ready.committed_stats = self
925            .channels
926            .iter()
927            .map(|(i, c)| (*i, c.get_local_safety_commit_id()))
928            .collect::<HashMap<_, _>>();
929        self.ready.member_states = self.get_member_states();
930    }
931
932    fn force_remote_step_down(
933        &mut self,
934        to: u64,
935        old_term: u64,
936        channel_id: u64,
937        detail: &MsgDetail,
938    ) {
939        debug!("node {} channel {} force remote {} step down because: receive staled {} msg, old term: {}",
940            self.id, channel_id, to, detail, old_term);
941        let mut msg_header = self.build_msg_header(channel_id);
942        msg_header.to = to;
943        self.ready.send_msg(msg_header);
944    }
945
946    fn reject_staled_message(&mut self, msg: &Message) -> bool {
947        if msg.index_term < self.get_channel_term(INDEX_CHANNEL_ID) {
948            self.force_remote_step_down(msg.from, msg.index_term, INDEX_CHANNEL_ID, &msg.detail);
949            true
950        } else if msg.channel_id != INDEX_CHANNEL_ID
951            && msg.channel_term < self.get_channel_term(msg.channel_id)
952        {
953            self.force_remote_step_down(msg.from, msg.channel_term, msg.channel_id, &msg.detail);
954            true
955        } else {
956            false
957        }
958    }
959
960    fn maybe_transfering_finised(&mut self, channel_id: u64, from: u64) {
961        if self.transfer.target_to(channel_id, from) {
962            info!(
963                "node {} channel {} transfer leadership to {} success",
964                self.id, channel_id, from
965            );
966            self.transfer = TransferingRecord::default();
967        }
968    }
969
970    fn try_advance_channel_term(&mut self, msg: &Message) {
971        let mut active_channels = vec![(INDEX_CHANNEL_ID, msg.index_term)];
972        if msg.channel_id != INDEX_CHANNEL_ID {
973            active_channels.push((msg.channel_id, msg.channel_term));
974        }
975
976        let mut advanced = false;
977        for (channel_id, term) in active_channels {
978            let target_leader_id = if is_leader_claim_message(msg, channel_id) {
979                msg.from
980            } else {
981                INVALID_NODE_ID
982            };
983            self.build_channel_if_not_exists(channel_id);
984            let last_id = self.log_buffer.channel_last_entry_id(channel_id);
985            let channel_info = self.channels.get_mut(&channel_id).unwrap();
986            if channel_info.term < term {
987                channel_info.to_follower(target_leader_id, term, last_id, "high-term");
988                self.maybe_transfering_finised(channel_id, msg.from);
989                advanced = true;
990            } else if (channel_info.is_candidate() || channel_info.has_leader())
991                && target_leader_id != INVALID_NODE_ID
992            {
993                channel_info.to_follower(target_leader_id, term, last_id, "claim-leader");
994                advanced = true;
995            }
996        }
997
998        if advanced {
999            // Advance pending read requests when leader is changed.
1000            self.submit_read_task();
1001        }
1002    }
1003
1004    fn get_index_channel(&self) -> &ChannelInfo {
1005        self.channels.get(&INDEX_CHANNEL_ID).unwrap()
1006    }
1007
1008    fn get_index_term(&self) -> u64 {
1009        self.channels.get(&INDEX_CHANNEL_ID).unwrap().term
1010    }
1011
1012    fn get_channel_term(&self, channel_id: u64) -> u64 {
1013        self.channels
1014            .get(&channel_id)
1015            .map(|c| c.term)
1016            .unwrap_or(INITIAL_TERM)
1017    }
1018
1019    fn maybe_assign_indexes(&mut self, channel_id: u64, first_entry_id: u64, last_entry_id: u64) {
1020        // leader order this entries and bcast to members.
1021        if self.is_index_leader() {
1022            let first_entry_id = std::cmp::max(
1023                self.log_buffer.channel_last_assigned_entry_id(channel_id) + 1u64,
1024                first_entry_id,
1025            );
1026            for idx in first_entry_id..(last_entry_id + 1) {
1027                self.assign_index(channel_id, idx);
1028            }
1029        }
1030    }
1031
1032    fn abort_pending_configs(&mut self, reason: &str) {
1033        assert_eq!(self.pending_configs.is_some(), true);
1034        self.ready.should_stable_metas = true;
1035        let pending_configs = self.pending_configs.as_ref().unwrap();
1036        info!(
1037            "node {} channel {} because {}, abort and rollback {:?}",
1038            self.id,
1039            pending_configs.channel_id,
1040            reason,
1041            pending_configs.desc()
1042        );
1043        for (_channe_id, channel_info) in &mut self.channels {
1044            channel_info
1045                .rollback_config_change(&pending_configs.new_configs, &pending_configs.old_configs);
1046        }
1047    }
1048
1049    fn maybe_update_config_change_stage(
1050        &mut self,
1051        channel_id: u64,
1052        first_entry_id: u64,
1053        _last_entry_id: u64,
1054    ) {
1055        assert_eq!(self.is_channel_follower(channel_id), true);
1056
1057        // The log entries might is truncated by channel's leader, so try to abort
1058        // pending_configs.
1059        if let Some(p) = &self.pending_configs {
1060            if p.channel_id == channel_id && first_entry_id <= p.entry_id {
1061                self.abort_pending_configs(&"log entries was truncated");
1062            }
1063        }
1064
1065        let entry = match self.log_buffer.channel_last_config_change_entry(channel_id) {
1066            Some(e) if e.entry_id >= first_entry_id => e,
1067            _ => return,
1068        };
1069
1070        let config_change = entry.configs.unwrap();
1071        info!("node {} channel {} recieve {:?}", self.id, channel_id, config_change);
1072
1073        let old_members = self.channel_ids.iter().cloned().collect::<HashSet<_>>();
1074        self.pending_configs = Some(PendingConfigs::new(
1075            entry.channel_id,
1076            config_change.index_id,
1077            config_change.entry_id,
1078            config_change.term,
1079            config_change.stage,
1080            config_change.members,
1081            &old_members,
1082        ));
1083
1084        self.update_channels_config_stage();
1085    }
1086
1087    fn handle_append(&mut self, from: u64, channel_id: u64, msg: AppendMsg) {
1088        debug!(
1089            "node {} channel {} receive append from {}: {:?}",
1090            self.id, channel_id, from, msg
1091        );
1092
1093        assert_ne!(channel_id, INDEX_CHANNEL_ID);
1094        let mut reply_msg = AppendReplyMsg {
1095            reject: false,
1096            entry_id: msg.prev_entry_id + 1u64,
1097            hint_id: 0,
1098        };
1099        let length = msg.entries.len();
1100        let committed_entry_id = self.channels.get(&channel_id).unwrap().committed_id;
1101        if msg.prev_entry_id < committed_entry_id {
1102            reply_msg.reject = false;
1103            reply_msg.hint_id = committed_entry_id + 1u64;
1104        } else if self.log_buffer.is_term_miss_matched(
1105            channel_id,
1106            msg.prev_entry_id,
1107            msg.prev_entry_term,
1108        ) {
1109            let reject_entry_id =
1110                self.log_buffer
1111                    .find_conflict(channel_id, msg.prev_entry_id, msg.prev_entry_term);
1112            reply_msg.reject = true;
1113            reply_msg.hint_id = reject_entry_id;
1114            self.channels.get_mut(&channel_id).unwrap().reset_tick();
1115        } else {
1116            let (first_entry_id, last_entry_id) =
1117                self.log_buffer.append_entries(channel_id, msg.entries);
1118            reply_msg.reject = false;
1119            reply_msg.hint_id = last_entry_id + 1;
1120            self.maybe_update_config_change_stage(channel_id, first_entry_id, last_entry_id);
1121            self.maybe_assign_indexes(channel_id, first_entry_id, last_entry_id);
1122            let channel_info = self.channels.get_mut(&channel_id).unwrap();
1123            channel_info.update_committed_id(std::cmp::min(last_entry_id, msg.committed_entry_id));
1124            channel_info.reset_tick();
1125        }
1126
1127        debug!(
1128            "node {} channel {} receive append from {}, prev id {}, prev term {}, length {}, reply: {:?}",
1129            self.id, channel_id, from,
1130            msg.prev_entry_id, msg.prev_entry_term, length,
1131            reply_msg
1132        );
1133        let mut msg_header = self.build_msg_header(channel_id);
1134        msg_header.to = from;
1135        msg_header.detail = MsgDetail::AppendReply(reply_msg);
1136        self.ready.send_msg(msg_header);
1137    }
1138
1139    fn handle_append_reply(&mut self, from: u64, channel_id: u64, msg: AppendReplyMsg) {
1140        debug!("node {} channel {} receive from {}: {:?}", self.id, channel_id, from, msg);
1141        match self.channels.get_mut(&channel_id) {
1142            Some(c) => c.update_progress(from, msg.reject, msg.entry_id, msg.hint_id),
1143            None => {}
1144        };
1145    }
1146
1147    fn handle_index(&mut self, from: u64, msg: IndexMsg) {
1148        info!(
1149            "node {} channel {} receive index msg from {}: {:?}",
1150            self.id, INDEX_CHANNEL_ID, from, msg
1151        );
1152        let mut reply_msg = IndexReplyMsg {
1153            reject: false,
1154            index_id: msg.prev_index_id + 1u64,
1155            hint_id: 0,
1156        };
1157        let length = msg.indexes.len();
1158        let committed_index_id = self.channels.get(&INDEX_CHANNEL_ID).unwrap().committed_id;
1159        if msg.prev_index_id < committed_index_id {
1160            reply_msg.reject = false;
1161            reply_msg.hint_id = committed_index_id + 1u64;
1162        } else if self.log_buffer.is_term_miss_matched(
1163            INDEX_CHANNEL_ID,
1164            msg.prev_index_id,
1165            msg.prev_index_term,
1166        ) {
1167            let reject_index_id = self
1168                .log_buffer
1169                .find_index_conflict(msg.prev_index_id, msg.prev_index_term);
1170            reply_msg.reject = true;
1171            reply_msg.hint_id = reject_index_id;
1172            self.channels
1173                .get_mut(&INDEX_CHANNEL_ID)
1174                .unwrap()
1175                .reset_tick();
1176        } else {
1177            let last_index_id = self.log_buffer.extend_indexes(msg.indexes);
1178            reply_msg.reject = false;
1179            reply_msg.hint_id = last_index_id + 1;
1180            let channel_info = self.channels.get_mut(&INDEX_CHANNEL_ID).unwrap();
1181            channel_info.update_committed_id(std::cmp::min(last_index_id, msg.committed_index_id));
1182            channel_info.reset_tick();
1183        }
1184
1185        debug!(
1186            "node {} channel {} receive index from {}, prev id {}, prev term {}, length {}, reply: {:?}",
1187            self.id, INDEX_CHANNEL_ID, from,
1188            msg.prev_index_id, msg.prev_index_term, length,
1189            reply_msg
1190        );
1191        let mut msg_header = self.build_msg_header(INDEX_CHANNEL_ID);
1192        msg_header.to = from;
1193        msg_header.detail = MsgDetail::IndexReply(reply_msg);
1194        self.ready.send_msg(msg_header);
1195    }
1196
1197    fn handle_index_reply(&mut self, from: u64, msg: IndexReplyMsg) {
1198        match self.channels.get_mut(&INDEX_CHANNEL_ID) {
1199            Some(c) => {
1200                c.update_progress(from, msg.reject, msg.index_id, msg.hint_id);
1201                if !msg.reject {
1202                    self.advance_transfer_progress(from);
1203                }
1204            }
1205            None => {}
1206        };
1207    }
1208
1209    fn is_refreshed_index(&self, id: u64, term: u64) -> bool {
1210        let last_index_id = self.log_buffer.last_index_id();
1211        let last_index_term = self.log_buffer.last_index_term();
1212        // Local last index term is less than remote last index term
1213        term > last_index_term
1214            // ... last index term is equals, but local last index id is less than remote last index id.
1215            || (term == last_index_term && id >= last_index_id)
1216    }
1217
1218    fn handle_vote(&mut self, from: u64, msg: VoteMsg) {
1219        let mut detail = VoteReplyMsg { reject: true };
1220        if self.is_refreshed_index(msg.last_index_id, msg.last_index_term) {
1221            detail.reject = !self
1222                .channels
1223                .get_mut(&INDEX_CHANNEL_ID)
1224                .unwrap()
1225                .try_make_promise(from);
1226        }
1227
1228        info!(
1229            "node {} channel {} receive remote {}: {:?}, reply reject: {}",
1230            self.id, INDEX_CHANNEL_ID, from, msg, detail.reject
1231        );
1232
1233        let mut msg_header = self.build_msg_header(INDEX_CHANNEL_ID);
1234        msg_header.to = from;
1235        msg_header.detail = MsgDetail::VoteReply(detail);
1236        self.ready.send_msg(msg_header);
1237    }
1238
1239    fn handle_vote_reply(&mut self, from: u64, msg: VoteReplyMsg) {
1240        info!(
1241            "node {} channel {} receive remote {}: {:?}",
1242            self.id, INDEX_CHANNEL_ID, from, msg
1243        );
1244        if !msg.reject {
1245            self.receive_granted_vote(from);
1246        }
1247    }
1248
1249    fn receive_granted_vote(&mut self, from: u64) {
1250        let config_stage = unwrap_config_stage(&self.pending_configs);
1251        let channel_info = self.channels.get_mut(&INDEX_CHANNEL_ID).unwrap();
1252        channel_info.receive_promise(from);
1253        if channel_info.receive_majority_promise(config_stage) {
1254            let last_index_id = self.log_buffer.last_index_id();
1255            channel_info.to_student(last_index_id, "granted");
1256            let missed_channel_ids = channel_info.missed_channel_ids();
1257            if missed_channel_ids.len() > 0 {
1258                // send prepare request
1259                debug!(
1260                    "node {} channel {} there have {} missed channel {:?} need to recovery",
1261                    self.id,
1262                    INDEX_CHANNEL_ID,
1263                    missed_channel_ids.len(),
1264                    missed_channel_ids
1265                );
1266                for channel_id in missed_channel_ids {
1267                    // Only send learned requset once, if this message is losted, the election
1268                    // progress is treated as failed.
1269                    self.bcast_prepare_request(channel_id, true);
1270                }
1271            } else {
1272                // There no any channel is missed, we could become a leader directly.
1273                debug!(
1274                    "node {} channel {} no any missed channel exists",
1275                    self.id, INDEX_CHANNEL_ID
1276                );
1277                self.all_channel_already_learned();
1278            }
1279        }
1280    }
1281
1282    fn handle_prepare(&mut self, from: u64, channel_id: u64, msg: PrepareMsg) {
1283        let mut detail = PrepareReplyMsg {
1284            reject: false,
1285            learn: msg.learn,
1286            entry_metas: Vec::new(),
1287        };
1288        if !msg.learn {
1289            self.build_channel_if_not_exists(channel_id);
1290            detail.reject = !self
1291                .channels
1292                .get_mut(&channel_id)
1293                .unwrap()
1294                .try_make_promise(from);
1295        }
1296
1297        if !detail.reject {
1298            let last_entry_id = self.log_buffer.channel_last_entry_id(channel_id);
1299            let first_entry_id = self.log_buffer.channel_first_entry_id(channel_id);
1300            let mut begin_entry_id = msg.committed_entry_id + 1u64;
1301            if begin_entry_id < first_entry_id {
1302                // A entry in range (msg.committed_entry_id, first_entry_id) isn't found,
1303                // that means those entries has already assigned indexes.
1304                begin_entry_id = first_entry_id + 1u64;
1305            }
1306            if msg.committed_entry_id < last_entry_id {
1307                detail.entry_metas =
1308                    self.log_buffer
1309                        .range_of_metas(channel_id, begin_entry_id, last_entry_id);
1310            }
1311        }
1312
1313        info!(
1314            "node {} channel {} receive remote {}: {:?}, reply {}",
1315            self.id, channel_id, from, msg, detail.reject
1316        );
1317
1318        let mut msg_header = self.build_msg_header(channel_id);
1319        msg_header.to = from;
1320        msg_header.detail = MsgDetail::PrepareReply(detail);
1321        self.ready.send_msg(msg_header);
1322    }
1323
1324    fn handle_prepare_reply(&mut self, from: u64, channel_id: u64, msg: PrepareReplyMsg) {
1325        assert_ne!(channel_id, INDEX_CHANNEL_ID);
1326        debug!("node {} channel {} receive remote {}: {:?}", self.id, channel_id, from, msg);
1327        if msg.reject {
1328            return;
1329        }
1330
1331        self.receive_prepare_promise(from, channel_id, msg.learn, msg.entry_metas);
1332    }
1333
1334    fn handle_declare(&mut self, from: u64, channel_id: u64, msg: DeclareMsg) {
1335        let last_id = self.log_buffer.channel_last_entry_id(channel_id);
1336        debug!(
1337            "node {} channel {} receive {} declare msg, committed id {}, local last id {}",
1338            self.id, channel_id, from, msg.committed_id, last_id
1339        );
1340        if last_id < msg.committed_id {
1341            // TODO(patrick) receive unexpect msg
1342            debug!("receive unexpect declare msg");
1343        } else {
1344            // Receive declare message from remote, reset local tick count.
1345            self.build_channel_if_not_exists(channel_id);
1346            let channel_info = self.channels.get_mut(&channel_id).unwrap();
1347            channel_info.update_committed_id(msg.committed_id);
1348            channel_info.reset_tick();
1349        }
1350
1351        let mut msg_header = self.build_msg_header(channel_id);
1352        msg_header.to = from;
1353        msg_header.detail = MsgDetail::DeclareReply(DeclareReplyMsg {
1354            receiving_snapshot: if let SnapshotState::Loading = self.snapshot_state {
1355                true
1356            } else {
1357                false
1358            },
1359        });
1360        self.ready.send_msg(msg_header);
1361    }
1362
1363    fn handle_declare_reply(&mut self, from: u64, channel_id: u64, msg: DeclareReplyMsg) {
1364        match self.channels.get_mut(&channel_id) {
1365            Some(c) => {
1366                c.on_receive_msg(from);
1367                self.snapshot_receving_states
1368                    .maybe_update_remote_snapshot_state(from, msg.receiving_snapshot);
1369            }
1370            None => {}
1371        };
1372    }
1373
1374    fn handle_read(&mut self, from: u64, msg: ReadMsg) {
1375        if !self.is_index_leader() {
1376            debug!(
1377                "node {} channel {} recieve read from {}: {:?}, but I am not index leader",
1378                self.id, INDEX_CHANNEL_ID, from, msg
1379            );
1380            return;
1381        }
1382
1383        debug!(
1384            "node {} channel {} recieve read from {}: {:?}",
1385            self.id, INDEX_CHANNEL_ID, from, msg
1386        );
1387        let mut msg_header = self.build_msg_header(INDEX_CHANNEL_ID);
1388        msg_header.to = from;
1389        msg_header.detail = MsgDetail::ReadReply(ReadReplyMsg {
1390            request_id: msg.request_id,
1391            recommend_id: self.get_index_channel().current_term_safe_commit_id(),
1392        });
1393        self.ready.send_msg(msg_header);
1394    }
1395
1396    fn apply_read_reply(&mut self, request_id: u64, recommend_id: u64) {
1397        let mut idx = 0;
1398        for value in &self.pending_reads {
1399            if *value > request_id {
1400                break;
1401            }
1402            // Read request requires no reading staled values from fsm, so that,
1403            // the `recommend_id` large than the actually `committed_id`, is acceptable.
1404            self.ready.finished_reads.insert(*value, recommend_id);
1405            idx += 1;
1406        }
1407        self.pending_reads.drain(0..idx);
1408    }
1409
1410    fn handle_read_reply(&mut self, from: u64, msg: ReadReplyMsg) {
1411        self.apply_read_reply(msg.request_id, msg.recommend_id);
1412    }
1413
1414    fn handle_timeout_now(&mut self, from: u64, channel_id: u64) {
1415        if !self.channel_ids.contains(&self.id) {
1416            warn!(
1417                "node {} receive timeout-now from {}, but current membership is empty",
1418                self.id, from
1419            );
1420            return;
1421        }
1422
1423        let last_id = self.log_buffer.channel_last_entry_id(channel_id);
1424        let channel_info = self.channels.get_mut(&channel_id).unwrap();
1425        channel_info.to_candidate(last_id, "timeout-now");
1426        if channel_id == INDEX_CHANNEL_ID {
1427            self.bcast_vote_request();
1428        } else {
1429            self.bcast_prepare_request(channel_id, false);
1430        }
1431    }
1432
1433    fn handle_snapshot(&mut self, from: u64, desc: SnapshotDesc) {
1434        // TODO(patrick) check staled request.
1435        if self.snapshot_state != SnapshotState::None {
1436            warn!(
1437                "node {} already in {:?} snapshot stage, ignore new snapshot",
1438                self.id, self.snapshot_state
1439            );
1440            return;
1441        }
1442
1443        info!("node {} start receiving snapshot from {}: {:?}", self.id, from, desc);
1444        self.ready.pending_snapshot = Some(desc);
1445        self.snapshot_state = SnapshotState::Loading;
1446    }
1447
1448    fn handle_snapshot_reply(&mut self, from: u64, msg: SnapshotReplyMsg) {
1449        self.snapshot_receving_states
1450            .update_remote_snapshot_state(from, false);
1451        if msg.received {
1452            info!("node {} remote {} receive snapshot finished", self.id, from);
1453            for (channel_id, next_id) in &msg.hints {
1454                match self.channels.get_mut(channel_id) {
1455                    Some(c) => c.update_progress(from, false, *next_id, *next_id),
1456                    None => {}
1457                };
1458            }
1459        } else {
1460            // TODO(patrick) Maybe we should trigger another snapshot?
1461            debug!("node {} remote {} reject snapshot, try trigger new once", self.id, from);
1462        }
1463    }
1464
1465    fn dispatch_message(&mut self, msg: Message) {
1466        debug_assert_ne!(
1467            msg.from, self.id,
1468            "node {} receive a message {:?} from itself",
1469            self.id, msg
1470        );
1471
1472        match msg.detail {
1473            MsgDetail::Append(d) => self.handle_append(msg.from, msg.channel_id, d),
1474            MsgDetail::AppendReply(d) => self.handle_append_reply(msg.from, msg.channel_id, d),
1475            MsgDetail::Index(d) => self.handle_index(msg.from, d),
1476            MsgDetail::IndexReply(d) => self.handle_index_reply(msg.from, d),
1477            MsgDetail::Vote(d) => self.handle_vote(msg.from, d),
1478            MsgDetail::VoteReply(d) => self.handle_vote_reply(msg.from, d),
1479            MsgDetail::Prepare(d) => self.handle_prepare(msg.from, msg.channel_id, d),
1480            MsgDetail::PrepareReply(d) => self.handle_prepare_reply(msg.from, msg.channel_id, d),
1481            MsgDetail::Declare(d) => self.handle_declare(msg.from, msg.channel_id, d),
1482            MsgDetail::DeclareReply(d) => self.handle_declare_reply(msg.from, msg.channel_id, d),
1483            MsgDetail::Snapshot(d) => self.handle_snapshot(msg.from, d),
1484            MsgDetail::SnapshotReply(d) => self.handle_snapshot_reply(msg.from, d),
1485            MsgDetail::Read(d) => self.handle_read(msg.from, d),
1486            MsgDetail::ReadReply(d) => self.handle_read_reply(msg.from, d),
1487            MsgDetail::TimeoutNow => self.handle_timeout_now(msg.from, msg.channel_id),
1488            MsgDetail::None => {}
1489            _ => panic!("unknown message {:?}", msg),
1490        }
1491    }
1492
1493    fn bcast_heartbeats(&mut self, channel_id: u64) {
1494        let channel_info = self.channels.get(&channel_id).unwrap();
1495        debug!(
1496            "node {} channel {} bcast heartbeats, committed id {}",
1497            self.id, channel_id, channel_info.committed_id
1498        );
1499        let msg_header = self.build_msg_header(channel_id);
1500        for id in self.remote_ids() {
1501            let matched_committed_id = channel_info.matched_committed_id(id);
1502            debug!(
1503                "node {} channel {} send heartbeat to {}, matched committed id {}",
1504                self.id, channel_id, id, matched_committed_id
1505            );
1506            let mut msg = msg_header.clone();
1507            msg.to = id;
1508            msg.detail = MsgDetail::Declare(DeclareMsg {
1509                committed_id: matched_committed_id,
1510            });
1511            self.ready.send_msg(msg);
1512        }
1513    }
1514
1515    fn bcast_vote_request(&mut self) {
1516        let mut msg_header = self.build_msg_header(INDEX_CHANNEL_ID);
1517        msg_header.detail = MsgDetail::Vote(VoteMsg {
1518            last_index_id: self.log_buffer.last_index_id(),
1519            last_index_term: self.log_buffer.last_index_term(),
1520        });
1521        for to in self.remote_ids() {
1522            let mut msg = msg_header.clone();
1523            msg.to = to;
1524            self.ready.send_msg(msg);
1525        }
1526
1527        self.receive_granted_vote(self.id);
1528    }
1529
1530    fn receive_channel_tick(&mut self, channel_id: u64, random_elect_timeout_tick: u32) -> bool {
1531        let local_id = self.id;
1532        let config_stage = unwrap_config_stage(&self.pending_configs);
1533        let contains_self = self.channel_ids.contains(&local_id);
1534        let is_index_leader = self.is_index_leader();
1535        let last_id = self.log_buffer.channel_last_entry_id(channel_id);
1536        let channel_info = self.channels.get_mut(&channel_id).unwrap();
1537
1538        channel_info.elapsed_tick += 1;
1539        if channel_info.elapsed_tick >= random_elect_timeout_tick {
1540            self.random_election_timeout_tick =
1541                generate_election_timeout_tick(self.option.base_election_timeout_tick);
1542            return match &channel_info.role {
1543                Role::Leader | Role::Student => {
1544                    channel_info.elapsed_tick = 0;
1545                    if channel_info.advance_quorum_lease()
1546                        < channel_info.stage_majority(config_stage)
1547                    {
1548                        channel_info.to_follower(
1549                            INVALID_NODE_ID,
1550                            channel_info.term,
1551                            last_id,
1552                            "lost-quorum",
1553                        );
1554                        false
1555                    } else {
1556                        channel_info.is_leader()
1557                    }
1558                }
1559                Role::Follower | Role::Candidate
1560                    if contains_self
1561                        && (channel_id == INDEX_CHANNEL_ID
1562                            || is_index_leader
1563                            || channel_id == local_id) =>
1564                {
1565                    channel_info.to_candidate(last_id, "election timeout");
1566                    true
1567                }
1568                _ => {
1569                    channel_info.to_follower(
1570                        INVALID_NODE_ID,
1571                        channel_info.term + 1,
1572                        last_id,
1573                        "election timeout",
1574                    );
1575                    false
1576                }
1577            };
1578        }
1579
1580        return Role::Leader == channel_info.role;
1581    }
1582
1583    fn append_empty_entry(&mut self, channel_id: u64) {
1584        let entry_id = self.channel_next_entry_id(channel_id);
1585        let entry = Entry {
1586            request_id: INTERNAL_REQUEST,
1587            channel_id: channel_id,
1588            entry_id: entry_id,
1589            index_id: INVALID_ID,
1590            channel_term: self.get_channel_term(channel_id),
1591            message: Vec::new(),
1592            context: None,
1593            configs: None,
1594        };
1595
1596        self.append_entry(channel_id, entry);
1597        if self.is_index_leader() {
1598            self.assign_index(channel_id, entry_id);
1599        }
1600    }
1601
1602    fn channel_already_learned(&mut self, channel_id: u64) {
1603        let channel_info = self.channels.get_mut(&INDEX_CHANNEL_ID).unwrap();
1604        if channel_info.is_leader() {
1605            debug!(
1606                "node {} already step channel {} leader, ignore staled learn msg from channel {}",
1607                self.id, INDEX_CHANNEL_ID, channel_id
1608            );
1609            return;
1610        }
1611
1612        channel_info.learned_voters.insert(channel_id);
1613
1614        // We already learn all entries of that entry, we could do recovery progress and
1615        // step a leader.
1616        if channel_info.learned_voters.len() == channel_info.missed_voters.len() {
1617            assert_eq!(channel_info.learned_voters, channel_info.missed_voters);
1618            self.all_channel_already_learned();
1619        }
1620    }
1621
1622    fn maybe_rebuild_pending_configs(&mut self) {
1623        let pending_configs = rebuild_pending_configs(self.id, &self.log_buffer, &self.channel_ids);
1624        if pending_configs.is_none()
1625            || is_same_pending_configs(&self.pending_configs, &pending_configs)
1626        {
1627            return;
1628        }
1629
1630        if self.pending_configs.is_some() {
1631            self.abort_pending_configs("find refreshed pending configs");
1632        }
1633        self.pending_configs = pending_configs;
1634        self.update_channels_config_stage();
1635    }
1636
1637    fn all_channel_already_learned(&mut self) {
1638        let last_index_id = self.log_buffer.last_index_id();
1639        let channel_info = self.channels.get_mut(&INDEX_CHANNEL_ID).unwrap();
1640        channel_info.to_leader(last_index_id, "learned");
1641
1642        // Compute channels re-assign indexes ordering:
1643        // 1. avoiding assign a entry's index twice, recover missed voters first
1644        // 2. assign new index for entries recieved in the intervals of electing
1645        let mut recovery_channels: Vec<u64> = channel_info.missed_voters.iter().cloned().collect();
1646        recovery_channels.extend(
1647            self.channel_ids
1648                .iter()
1649                .filter(|id| !channel_info.missed_voters.contains(*id) && **id != INDEX_CHANNEL_ID)
1650                .cloned(),
1651        );
1652        info!(
1653            "node {} channel {} learned voters {:?}, missed voters {:?}",
1654            self.id, INDEX_CHANNEL_ID, channel_info.learned_voters, channel_info.missed_voters
1655        );
1656        for channel_id in recovery_channels {
1657            // There might exists some entires receiving in student state, using the last
1658            // entry id in local buffer as the last entry to assign index.
1659            let mut last_entry_id = self.log_buffer.channel_last_entry_id(channel_id);
1660            let last_assigned_entry_id = self.log_buffer.channel_last_assigned_entry_id(channel_id);
1661            let last_learned_entry_id = self
1662                .channels
1663                .get_mut(&channel_id)
1664                .unwrap()
1665                .last_learned_entry_meta()
1666                .id;
1667            if last_entry_id < last_learned_entry_id {
1668                // It means that current node must lost some entries.
1669                last_entry_id = last_learned_entry_id;
1670            }
1671            if last_entry_id < last_assigned_entry_id {
1672                warn!("node {} channel {} recovery channel {} indexes: the entries ({}, {}] is missing",
1673                      self.id, INDEX_CHANNEL_ID, channel_id, last_assigned_entry_id, last_entry_id);
1674                continue;
1675            }
1676            info!(
1677                "node {} channel {} re-assign index to channel {} recoveried entries ({}, {}]",
1678                self.id, INDEX_CHANNEL_ID, channel_id, last_assigned_entry_id, last_entry_id
1679            );
1680            for id in (last_assigned_entry_id + 1u64)..(last_entry_id + 1u64) {
1681                self.assign_index(channel_id, id);
1682            }
1683        }
1684
1685        self.assign_index(self.id, INVALID_ID);
1686        self.maybe_rebuild_pending_configs();
1687        self.submit_read_task();
1688    }
1689
1690    fn receive_prepare_promise(
1691        &mut self,
1692        from: u64,
1693        channel_id: u64,
1694        learn: bool,
1695        entry_metas: Vec<EntryMeta>,
1696    ) {
1697        assert_ne!(channel_id, INDEX_CHANNEL_ID);
1698        let config_stage = unwrap_config_stage(&self.pending_configs);
1699        let channel_info = self.channels.get_mut(&channel_id).unwrap();
1700        if channel_info.try_receive_prepare_entries(entry_metas) {
1701            info!("node {} channel {} learned entries from {}", self.id, channel_id, from);
1702        }
1703
1704        channel_info.receive_promise(from);
1705        if channel_info.receive_majority_promise(config_stage) {
1706            // Save learned entries
1707            info!(
1708                "node {} channel {} has receive majority promised and learned {} entries",
1709                self.id,
1710                channel_id,
1711                channel_info.max_received_entries.len()
1712            );
1713            if self.id != channel_id && learn {
1714                self.channel_already_learned(channel_id);
1715            } else {
1716                // already receive majority response, we can step to this command channel's
1717                // leader.
1718                let channel_last_entry_id = self.log_buffer.channel_last_entry_id(channel_id);
1719                channel_info.to_leader(channel_last_entry_id, "granted");
1720                if channel_id == self.id {
1721                    // We only append no-op entry for ourself channel.
1722                    self.append_empty_entry(channel_id);
1723                }
1724            }
1725        }
1726    }
1727
1728    fn promise_itself(&mut self, channel_id: u64, committed_entry_id: u64, learn: bool) {
1729        assert_ne!(channel_id, INDEX_CHANNEL_ID);
1730        let last_entry_id = self.log_buffer.channel_last_entry_id(channel_id);
1731        trace!(
1732            "node {} channel {} promise itself, committed entry id {}, last entry id {}",
1733            self.id,
1734            channel_id,
1735            committed_entry_id,
1736            last_entry_id
1737        );
1738        assert!(
1739            committed_entry_id <= last_entry_id,
1740            "committed entry id {} should less or equals to last entry id {}",
1741            committed_entry_id,
1742            last_entry_id,
1743        );
1744        let entry_metas = if learn {
1745            self.log_buffer
1746                .range_of_metas(channel_id, committed_entry_id + 1u64, last_entry_id)
1747        } else {
1748            Vec::new()
1749        };
1750        self.receive_prepare_promise(self.id, channel_id, learn, entry_metas);
1751    }
1752
1753    fn bcast_prepare_request(&mut self, channel_id: u64, learn: bool) {
1754        let matched_committed_id = self
1755            .channels
1756            .get(&channel_id)
1757            .unwrap()
1758            .matched_committed_id(self.id);
1759        debug!(
1760            "node {} channel {} bcast prepare request to {:?}, committed id {}",
1761            self.id,
1762            channel_id,
1763            self.remote_ids(),
1764            matched_committed_id,
1765        );
1766        let mut header = self.build_msg_header(channel_id);
1767        header.detail = MsgDetail::Prepare(PrepareMsg {
1768            learn,
1769            committed_entry_id: matched_committed_id,
1770        });
1771        for to in self.remote_ids() {
1772            let mut msg = header.clone();
1773            msg.to = to;
1774            self.ready.send_msg(msg);
1775        }
1776        self.promise_itself(channel_id, matched_committed_id, learn);
1777    }
1778
1779    fn send_timeout_now(&mut self, channel_id: u64, to: u64) {
1780        info!("node {} channel {} send timeout now to {}", self.id, channel_id, to);
1781        let mut msg_header = self.build_msg_header(channel_id);
1782        msg_header.to = to;
1783        msg_header.detail = MsgDetail::TimeoutNow;
1784        self.ready.send_msg(msg_header);
1785    }
1786
1787    fn transfer_leader(&mut self, channel_id: u64, to: u64) {
1788        let last_id = self.log_buffer.channel_last_entry_id(channel_id);
1789        let channel_info = self.channels.get(&channel_id).unwrap();
1790        if channel_info.is_remote_matched(to, last_id) {
1791            self.send_timeout_now(channel_id, to);
1792        }
1793    }
1794
1795    fn try_transfer_channel_leader(&mut self, channel_id: u64, to: u64) {
1796        if self.transfer.initiate(channel_id, to) {
1797            self.transfer_leader(channel_id, to);
1798        }
1799    }
1800
1801    fn advance_transfer_progress(&mut self, by: u64) {
1802        if self.transfer.doing()
1803            && self.transfer.channel_id == INDEX_CHANNEL_ID
1804            && self.transfer.to == by
1805        {
1806            self.transfer_leader(self.transfer.channel_id, self.transfer.to);
1807        }
1808    }
1809
1810    fn all_channels_tick(&mut self) {
1811        let random_elect_timeout_tick = self.random_election_timeout_tick;
1812        for idx in 0..self.channel_ids.len() {
1813            let channel_id = self.channel_ids[idx];
1814            if !self.receive_channel_tick(channel_id, random_elect_timeout_tick) {
1815                continue;
1816            }
1817
1818            match self.channels.get(&channel_id).unwrap().role {
1819                Role::Leader => {
1820                    self.bcast_heartbeats(channel_id);
1821                    if channel_id != self.id
1822                        && channel_id != INDEX_CHANNEL_ID
1823                        && self.pending_configs.is_none()
1824                    {
1825                        self.try_transfer_channel_leader(channel_id, channel_id);
1826                    }
1827                }
1828                Role::Candidate if channel_id == INDEX_CHANNEL_ID => {
1829                    assert_eq!(self.channel_ids.contains(&self.id), true);
1830                    self.bcast_vote_request();
1831                }
1832                Role::Candidate => {
1833                    // for normal channels
1834                    assert_eq!(self.channel_ids.contains(&self.id), true);
1835                    self.bcast_prepare_request(channel_id, false);
1836                }
1837                _ => {
1838                    panic!("unexpected role")
1839                }
1840            }
1841        }
1842    }
1843
1844    fn reset_status_by_snapshot(
1845        &mut self,
1846        hard_states: &HashMap<u64, HardState>,
1847        desc: &SnapshotDesc,
1848    ) {
1849        self.log_buffer = MemStorage::recovery(self.id, &desc.channel_metas);
1850        self.snapshot_receving_states = RemoteSnapshotRecevingStates::new(self.id);
1851
1852        self.channel_ids = active_members(&desc.members);
1853        self.channel_ids.push(INDEX_CHANNEL_ID);
1854        self.channel_ids.sort();
1855
1856        let mut channels = HashMap::new();
1857        for channel_id in &self.channel_ids {
1858            let channel_last_id = self.log_buffer.channel_last_entry_id(*channel_id);
1859            let desc = ChannelDesc {
1860                channel_id: *channel_id,
1861                committed_id: channel_last_id,
1862                last_id: channel_last_id,
1863                hard_state: hard_states
1864                    .get(channel_id)
1865                    .unwrap_or(&HardState::default())
1866                    .clone(),
1867                members: desc.members.clone(),
1868            };
1869            channels.insert(*channel_id, ChannelInfo::new(self.id, &desc));
1870        }
1871
1872        self.channels = channels;
1873    }
1874
1875    fn bcast_snapshot_finished(&mut self, received: bool, desc: &SnapshotDesc) {
1876        for channel_id in self.remote_ids() {
1877            let hints = desc
1878                .channel_metas
1879                .iter()
1880                .map(|(k, v)| (*k, v.id + 1u64))
1881                .collect::<HashMap<_, _>>();
1882            let mut msg = self.build_msg_header(channel_id);
1883            msg.to = channel_id;
1884            msg.detail = MsgDetail::SnapshotReply(SnapshotReplyMsg { received, hints });
1885            self.ready.send_msg(msg);
1886        }
1887    }
1888
1889    #[cfg(test)]
1890    fn enter_replicate_state(&mut self) {
1891        self.channels
1892            .iter_mut()
1893            .map(|(_, c)| c.progress_map.iter_mut())
1894            .flatten()
1895            .for_each(|(_, p)| {
1896                p.state = ProgressState::Replicate;
1897                p.active = true;
1898            });
1899    }
1900
1901    #[cfg(test)]
1902    fn reset_member_next_id(&mut self, nodes: Vec<u64>) {
1903        self.channels
1904            .iter_mut()
1905            .map(|(_, c)| c.progress_map.iter_mut())
1906            .flatten()
1907            .for_each(|(id, p)| {
1908                if nodes.contains(id) {
1909                    p.next_id = p.match_id + 1u64;
1910                }
1911            });
1912    }
1913
1914    #[cfg(test)]
1915    fn remote_matched_id(&self, channel_id: u64, remote_id: u64) -> u64 {
1916        self.channels
1917            .get(&channel_id)
1918            .unwrap()
1919            .progress_map
1920            .get(&remote_id)
1921            .unwrap()
1922            .match_id
1923    }
1924
1925    fn check_index_leader(&self) -> Result<(), Error> {
1926        if !self.is_index_leader() {
1927            Err(Error::NotLeader)
1928        } else {
1929            Ok(())
1930        }
1931    }
1932
1933    fn check_local_leader(&self) -> Result<(), Error> {
1934        if !self.is_local_channel_leader() {
1935            Err(Error::NotLeader)
1936        } else {
1937            Ok(())
1938        }
1939    }
1940
1941    fn check_transfer_leader(&self) -> Result<(), Error> {
1942        if self.transfer.doing() && self.transfer.channel_id == INDEX_CHANNEL_ID {
1943            Err(Error::Transfering)
1944        } else {
1945            Ok(())
1946        }
1947    }
1948
1949    fn compaign_removing_channel_leaders(&mut self) -> bool {
1950        if let Some(p) = &self.pending_configs {
1951            let pending_channel_ids = p
1952                .old_configs
1953                .iter()
1954                .filter(|id| !self.is_channel_leader(**id))
1955                .cloned()
1956                .collect::<Vec<u64>>();
1957            debug!(
1958                "node {} channel {} pending configs old members {:?}, wait compaign channels {:?}",
1959                self.id, INDEX_CHANNEL_ID, p.old_configs, pending_channel_ids
1960            );
1961            if pending_channel_ids.is_empty() {
1962                return true;
1963            }
1964            for channel_id in pending_channel_ids {
1965                self.handle_timeout_now(self.id, channel_id);
1966            }
1967        }
1968        false
1969    }
1970
1971    fn submit_config_change_entry(&mut self, configs: HashSet<u64>, stage: ConfigStage) -> u64 {
1972        assert_eq!(self.is_index_leader(), true);
1973
1974        let entry_id = self.next_entry_id();
1975        let index_id = self.log_buffer.next_index_id();
1976        let local_term = self.local_entry_channel_term();
1977        let entry = Entry {
1978            request_id: CONFIG_CHANGE_ID,
1979            channel_id: self.id,
1980            entry_id: entry_id,
1981            index_id: INVALID_ID,
1982            channel_term: local_term,
1983            message: vec![],
1984            context: None,
1985            configs: Some(ChangeConfig {
1986                index_id,
1987                entry_id,
1988                term: local_term,
1989                stage,
1990                members: configs,
1991            }),
1992        };
1993
1994        self.append_entry(self.id, entry);
1995        self.assign_index(self.id, entry_id)
1996    }
1997
1998    fn update_channels_config_stage(&mut self) {
1999        assert_eq!(self.pending_configs.is_some(), true);
2000        let pending_configs = self.pending_configs.as_mut().unwrap();
2001        match &pending_configs.stage {
2002            ConfigStage::Old | ConfigStage::New => {}
2003            ConfigStage::Both => {
2004                for channel_id in &self.channel_ids {
2005                    self.channels
2006                        .get_mut(channel_id)
2007                        .unwrap()
2008                        .enter_both_config_stage(
2009                            &pending_configs.new_configs,
2010                            &pending_configs.old_configs,
2011                        );
2012                }
2013                self.ready.should_stable_metas = true;
2014            }
2015        }
2016    }
2017
2018    fn setup_new_config_channels<L>(&mut self, log_meta_view: &L)
2019    where
2020        L: LogMetaView,
2021    {
2022        assert_eq!(self.pending_configs.is_some(), true);
2023        let pending_configs = self.pending_configs.as_ref().unwrap();
2024        let members: Vec<u64> = pending_configs.configs.iter().cloned().collect();
2025        let mut new_channel_ids = members.clone();
2026        new_channel_ids.push(INDEX_CHANNEL_ID);
2027        new_channel_ids.sort();
2028        info!(
2029            "node {} channel {} update channels from {:?} to {:?}",
2030            self.id, INDEX_CHANNEL_ID, self.channel_ids, new_channel_ids
2031        );
2032        self.channel_ids = new_channel_ids;
2033
2034        for id in &pending_configs.old_configs {
2035            assert_ne!(*id, INDEX_CHANNEL_ID);
2036            self.channels.remove(&id);
2037        }
2038        let hard_states = log_meta_view.hard_states();
2039        for channel_id in &pending_configs.new_configs {
2040            if self.channels.contains_key(channel_id) {
2041                info!(
2042                    "node {} channel {} already exists, ignore add command",
2043                    self.id, *channel_id
2044                );
2045                continue;
2046            }
2047            let desc = ChannelDesc {
2048                channel_id: *channel_id,
2049                committed_id: 0, // we calculate committed id
2050                last_id: self.log_buffer.channel_last_entry_id(*channel_id),
2051                hard_state: hard_states
2052                    .get(channel_id)
2053                    .unwrap_or(&HardState::default())
2054                    .clone(),
2055                members: members
2056                    .iter()
2057                    .map(|id| (*id, MemberState::default()))
2058                    .collect(),
2059            };
2060            self.channels
2061                .insert(*channel_id, ChannelInfo::new(self.id, &desc));
2062        }
2063        for channel_id in &self.channel_ids {
2064            self.channels
2065                .get_mut(channel_id)
2066                .unwrap()
2067                .enter_new_config_stage(&pending_configs.new_configs, &pending_configs.old_configs);
2068        }
2069        self.ready.should_stable_metas = true;
2070    }
2071
2072    fn is_pending_configs_in_new_stage(&self) -> bool {
2073        self.pending_configs
2074            .as_ref()
2075            .map(|c| c.stage)
2076            .unwrap_or(ConfigStage::Old)
2077            == ConfigStage::New
2078    }
2079
2080    fn maybe_apply_config_change<L>(&mut self, log_meta_view: &L)
2081    where
2082        L: LogMetaView,
2083    {
2084        assert_eq!(self.pending_configs.is_some(), true);
2085
2086        let pending_configs = self.pending_configs.as_ref().unwrap();
2087        assert_eq!(pending_configs.stage, ConfigStage::New);
2088        if pending_configs.index_id <= self.log_buffer.chosen_index_id {
2089            info!(
2090                "node {} channel {} config change to {:?} success",
2091                self.id, INDEX_CHANNEL_ID, pending_configs.configs
2092            );
2093            if self.is_index_leader() {
2094                // bcast the last msgs to all old stage members to help them apply configs as
2095                // much as possible.
2096                self.bcast_each_channels(log_meta_view);
2097            }
2098            self.setup_new_config_channels(log_meta_view);
2099            self.pending_configs = None;
2100            self.ready.should_stable_metas = true;
2101        }
2102    }
2103
2104    fn maybe_advance_config_change_stage(&mut self) {
2105        assert_eq!(self.is_index_leader(), true);
2106        assert_eq!(self.pending_configs.is_some(), true);
2107
2108        let pending_configs = self.pending_configs.as_mut().unwrap();
2109        match &pending_configs.stage {
2110            ConfigStage::Old => {
2111                info!(
2112                    "node {} channel {} config entry at {:?} has already compagin all channels leader, step to {:?}",
2113                    self.id, INDEX_CHANNEL_ID, ConfigStage::Old, ConfigStage::Both
2114                );
2115                pending_configs.stage = ConfigStage::Both;
2116                pending_configs.index_id = self.log_buffer.next_index_id();
2117                pending_configs.entry_id = self.log_buffer.channel_next_entry_id(self.id);
2118                let configs = pending_configs.configs.clone();
2119                self.submit_config_change_entry(configs, ConfigStage::Both);
2120                self.update_channels_config_stage();
2121            }
2122            ConfigStage::Both if pending_configs.index_id <= self.log_buffer.chosen_index_id => {
2123                info!(
2124                    "node {} channel {} config entry at {:?} has already choosen, step to {:?}",
2125                    self.id,
2126                    INDEX_CHANNEL_ID,
2127                    ConfigStage::Both,
2128                    ConfigStage::New,
2129                );
2130                self.ready.should_stable_metas = true;
2131                pending_configs.stage = ConfigStage::New;
2132                pending_configs.index_id = self.log_buffer.next_index_id();
2133                pending_configs.entry_id = self.log_buffer.channel_next_entry_id(self.id);
2134                let configs = pending_configs.configs.clone();
2135                self.submit_config_change_entry(configs, ConfigStage::New);
2136                self.update_channels_config_stage();
2137            }
2138            _ => {}
2139        }
2140    }
2141
2142    fn check_pending_configs(&self) -> Result<(), Error> {
2143        if let None = &self.pending_configs {
2144            Ok(())
2145        } else {
2146            Err(Error::Busy)
2147        }
2148    }
2149
2150    fn release_memory(&mut self) {
2151        let mut replicated_index_ids = self
2152            .channels
2153            .get(&INDEX_CHANNEL_ID)
2154            .unwrap()
2155            .progress_map
2156            .iter()
2157            .map(|(_, p)| p.match_id)
2158            .collect::<Vec<_>>();
2159
2160        let total_numbers = replicated_index_ids.len();
2161        if total_numbers == 0 {
2162            return;
2163        }
2164
2165        replicated_index_ids.sort();
2166        let majority_replicated_index_id =
2167            replicated_index_ids[total_numbers - crate::progress::majority(total_numbers)];
2168
2169        let min_replicated_index_id = replicated_index_ids[0];
2170        let diff = majority_replicated_index_id - min_replicated_index_id;
2171        self.log_buffer.release_entries_until(if diff > 10000 {
2172            majority_replicated_index_id - (0.618 * diff as f64) as u64
2173        } else {
2174            min_replicated_index_id
2175        });
2176    }
2177
2178    // FIXME: if no such leader was found, the read request will delay after
2179    // heartbeat. A follower forwards read requests to index leader.
2180    fn forward_read_requests(&mut self) -> bool {
2181        let index_leader_id = self.get_index_channel().leader_id;
2182        if index_leader_id == INVALID_NODE_ID || index_leader_id == self.id {
2183            warn!("node {} try forward read msg to leader, but no such leader are found", self.id);
2184            return false;
2185        }
2186
2187        debug_assert_ne!(index_leader_id, INDEX_CHANNEL_ID);
2188
2189        let mut msg = self.build_msg_header(INDEX_CHANNEL_ID);
2190        msg.to = index_leader_id;
2191
2192        let last_request_id = self.next_read_request_id - 1u64;
2193        msg.detail = MsgDetail::Read(ReadMsg {
2194            request_id: last_request_id,
2195        });
2196        self.ready.send_msg(msg);
2197
2198        debug!(
2199            "node {} forward read msg with request id {} to leader {}",
2200            self.id, last_request_id, index_leader_id
2201        );
2202
2203        true
2204    }
2205
2206    fn advance_read_requests(&mut self) {
2207        if self.undoing_reads.is_empty() {
2208            return;
2209        }
2210
2211        let is_leader = self.is_index_leader();
2212        if !is_leader && !self.forward_read_requests() {
2213            return;
2214        }
2215
2216        let mut ureads: Vec<u64> = vec![];
2217        std::mem::swap(&mut self.undoing_reads, &mut ureads);
2218        self.pending_reads.extend(ureads.into_iter());
2219
2220        if is_leader {
2221            self.apply_read_reply(
2222                self.next_read_request_id,
2223                self.get_index_channel().current_term_safe_commit_id(),
2224            );
2225        }
2226    }
2227
2228    fn submit_read_task(&mut self) -> u64 {
2229        let request_id = self.next_read_request_id;
2230        self.next_read_request_id += 1;
2231        self.undoing_reads.push(request_id);
2232
2233        request_id
2234    }
2235
2236    fn transfer_tick(&mut self) {
2237        self.transfer.elapsed();
2238        if self.transfer.timeout() {
2239            info!(
2240                "node {} channel {} transfer leadership to {} timeout, abort transfer task",
2241                self.id, self.transfer.channel_id, self.transfer.to
2242            );
2243            self.transfer.abort();
2244        }
2245    }
2246}
2247
2248fn rebuild_pending_configs(
2249    id: u64,
2250    log_buffer: &MemStorage,
2251    channel_ids: &Vec<u64>,
2252) -> Option<PendingConfigs> {
2253    let entry = match log_buffer.last_unchosen_config_change_entry() {
2254        Some(e) => e,
2255        None => return None,
2256    };
2257
2258    let config_change = entry
2259        .configs
2260        .as_ref()
2261        .expect("this entry MUST be a config change");
2262    let old_members = filter_index_channel(channel_ids);
2263    debug!(
2264        "node {} channel {} found pending config change {:?} {:?}, old members: {:?}",
2265        id, INDEX_CHANNEL_ID, entry, config_change, old_members
2266    );
2267    Some(PendingConfigs::new(
2268        entry.channel_id,
2269        config_change.index_id,
2270        config_change.entry_id,
2271        config_change.term,
2272        config_change.stage,
2273        config_change.members.clone(),
2274        &old_members,
2275    ))
2276}
2277
2278fn rebuild_channels(
2279    id: u64,
2280    members: &HashMap<u64, MemberState>,
2281    log_buffer: &MemStorage,
2282    hard_states: &HashMap<u64, HardState>,
2283) -> (Vec<u64>, HashMap<u64, ChannelInfo>) {
2284    // sort channel ids to make sure INDEX CHANNEL always the first timeout
2285    // channel.
2286    let mut channel_ids = active_members(members);
2287    channel_ids.push(INDEX_CHANNEL_ID);
2288    channel_ids.sort();
2289
2290    let mut channels = HashMap::new();
2291    for channel_id in &channel_ids {
2292        // When we recovery channels, using the applied entry id as the committed id is
2293        // safety.
2294        let committed_id = log_buffer.channel_first_entry_id(*channel_id);
2295        let desc = ChannelDesc {
2296            channel_id: *channel_id,
2297            committed_id: committed_id,
2298            last_id: log_buffer.channel_last_entry_id(*channel_id),
2299            hard_state: hard_states
2300                .get(channel_id)
2301                .unwrap_or(&HardState::default())
2302                .clone(),
2303            members: members.clone(),
2304        };
2305        channels.insert(*channel_id, ChannelInfo::new(id, &desc));
2306        assert_eq!(channels.get(channel_id).unwrap().get_local_match_id(), desc.last_id);
2307    }
2308    (channel_ids, channels)
2309}
2310
2311impl Sdcons {
2312    pub fn new(
2313        id: u64,
2314        applied_id: u64,
2315        option: SdconsOption,
2316        log_buffer: MemStorage,
2317        membership: &HashMap<u64, MemberState>,
2318        hard_states: &HashMap<u64, HardState>,
2319    ) -> Sdcons {
2320        let (channel_ids, channels) = rebuild_channels(id, membership, &log_buffer, hard_states);
2321        let pending_configs = rebuild_pending_configs(id, &log_buffer, &channel_ids);
2322        let random_tick = generate_election_timeout_tick(option.base_election_timeout_tick);
2323        let mut s = Sdcons {
2324            id,
2325            random_election_timeout_tick: random_tick,
2326            option,
2327            log_buffer,
2328            channel_ids,
2329            channels,
2330            pending_configs,
2331
2332            next_read_request_id: 1,
2333            pending_reads: VecDeque::new(),
2334            undoing_reads: Vec::new(),
2335
2336            choosen_id: applied_id,
2337            pending_id: applied_id,
2338            applied_id: applied_id,
2339
2340            transfer: TransferingRecord::default(),
2341            snapshot_state: SnapshotState::None,
2342            snapshot_receving_states: RemoteSnapshotRecevingStates::new(id),
2343
2344            ready: Ready::default(),
2345        };
2346
2347        if s.pending_configs.is_some() {
2348            s.update_channels_config_stage();
2349        }
2350
2351        s
2352    }
2353
2354    pub fn change_config(&mut self, members: Vec<u64>) -> Result<u64, Error> {
2355        self.check_index_leader()?;
2356        self.check_local_leader()?;
2357        self.check_transfer_leader()?;
2358        self.check_pending_configs()?;
2359
2360        let channel_ids = filter_index_channel(&self.channel_ids);
2361
2362        // Enter first stage, wait all leader's in
2363        let pending_configs = PendingConfigs::new(
2364            self.id,
2365            INVALID_ID,
2366            INVALID_ID,
2367            INITIAL_TERM,
2368            ConfigStage::Old,
2369            members.into_iter().collect(),
2370            &channel_ids,
2371        );
2372
2373        debug!(
2374            "node {} receive change config: {:?} new configs: {:?}, old configs: {:?}",
2375            self.id,
2376            pending_configs.configs,
2377            pending_configs.new_configs,
2378            pending_configs.old_configs
2379        );
2380
2381        self.pending_configs = Some(pending_configs);
2382        Ok(0)
2383    }
2384
2385    pub fn submit_task(&mut self, request_id: u64, task: Task) -> Result<u64, Error> {
2386        self.check_local_leader()?;
2387        self.check_transfer_leader()?;
2388
2389        let entry_id = self.next_entry_id();
2390        let entry = Entry {
2391            request_id,
2392            channel_id: self.id,
2393            entry_id: entry_id,
2394            index_id: INVALID_ID,
2395            channel_term: self.local_entry_channel_term(),
2396            message: task.message,
2397            context: task.context,
2398            configs: None,
2399        };
2400
2401        self.append_entry(self.id, entry);
2402        if self.is_index_leader() {
2403            self.assign_index(self.id, entry_id);
2404        }
2405
2406        Ok(entry_id)
2407    }
2408
2409    // Got a committed index from order leader, user should guarranted request_id is
2410    // monotonically increasing.
2411    pub fn leased_read(&mut self) -> Result<u64, Error> {
2412        self.check_local_leader()?;
2413        self.check_transfer_leader()?;
2414
2415        Ok(self.submit_read_task())
2416    }
2417
2418    pub fn tick(&mut self) {
2419        self.transfer_tick();
2420        self.all_channels_tick();
2421
2422        // Electing as removing channel leaders to ensure those channels won't recieve
2423        // any new proposal.
2424        if self.is_index_leader() {
2425            if self.compaign_removing_channel_leaders() {
2426                self.maybe_advance_config_change_stage();
2427            }
2428        } else if !self.pending_reads.is_empty() {
2429            // Otherwise, if there exists pending read reqeusts, forwards those request to
2430            // index leader.
2431            self.forward_read_requests();
2432        }
2433    }
2434
2435    pub fn step(&mut self, msg: Message) {
2436        if self.reject_staled_message(&msg) {
2437            return;
2438        }
2439        self.try_advance_channel_term(&msg);
2440        self.dispatch_message(msg);
2441    }
2442
2443    pub fn control(&mut self, c: Control) -> Result<(), Error> {
2444        match c {
2445            Control::Checkpoint => {
2446                if self.snapshot_state != SnapshotState::None {
2447                    warn!(
2448                        "node {} snapshot already in {:?} state, ignore checkpoint request",
2449                        self.id, self.snapshot_state
2450                    );
2451                } else {
2452                    self.snapshot_state = SnapshotState::Creating;
2453                }
2454            }
2455            Control::ReleaseMemory => {
2456                self.release_memory();
2457            }
2458            Control::TimeoutNow => {
2459                info!(
2460                    "node {} channel {} receive timeout now, start campaign",
2461                    self.id, INDEX_CHANNEL_ID
2462                );
2463                self.handle_timeout_now(self.id, INDEX_CHANNEL_ID);
2464            }
2465            Control::TransferLeader(node_id) => {
2466                self.check_index_leader()?;
2467                if node_id != self.id {
2468                    if !self.transfer.initiate(INDEX_CHANNEL_ID, node_id) {
2469                        return Err(Error::Transfering);
2470                    }
2471
2472                    info!(
2473                        "node {} channel {} start transfer leadership to {}",
2474                        self.id, INDEX_CHANNEL_ID, node_id
2475                    );
2476                    self.advance_transfer_progress(node_id);
2477                } else {
2478                    warn!(
2479                        "node {} channel {} try transfer leadership to self",
2480                        self.id, INDEX_CHANNEL_ID
2481                    );
2482                }
2483            }
2484        }
2485        Ok(())
2486    }
2487
2488    pub fn advance<L>(&mut self, log_meta_view: &L) -> Ready
2489    where
2490        L: LogMetaView,
2491    {
2492        self.bcast_each_channels(log_meta_view);
2493        self.advance_choose_progress();
2494        self.advance_read_requests();
2495
2496        // Avoid stable or apply entries when appling snapshot.
2497        if self.snapshot_state != SnapshotState::Loading {
2498            self.extract_chosen_entries();
2499            self.extract_unstable_entries();
2500            if self.is_pending_configs_in_new_stage() {
2501                self.maybe_apply_config_change(log_meta_view);
2502            }
2503        }
2504
2505        if self.ready.chosen_entries.len() > 0 {
2506            debug!("apply {:?}", self.ready.chosen_entries);
2507        }
2508
2509        // WARNING: collect_state_snapshot should always executes after updating
2510        // operation.
2511        self.collect_state_snapshot();
2512        std::mem::take(&mut self.ready)
2513    }
2514
2515    pub fn submit_apply_result(&mut self, _from: u64, to: u64) -> bool {
2516        self.log_buffer.submit_applied_result(to)
2517    }
2518
2519    pub fn submit_stable_result(&mut self, channel_id: u64, _from: u64, to: u64) -> bool {
2520        if let Some(channel_info) = self.channels.get_mut(&channel_id) {
2521            channel_info.update_local_match(to);
2522        }
2523        if channel_id == INDEX_CHANNEL_ID {
2524            self.log_buffer.submit_stable_index_result(to)
2525        } else {
2526            self.log_buffer.submit_stable_result(channel_id, to)
2527        }
2528    }
2529
2530    pub fn log_replicated(&mut self, node_id: u64, channel_id: u64, next_id: u64) {
2531        match self.channels.get_mut(&channel_id) {
2532            Some(c) => c.log_replicated(node_id, next_id),
2533            None => {}
2534        };
2535    }
2536
2537    pub fn finish_snapshot_loading(
2538        &mut self,
2539        received: bool,
2540        hard_states: &HashMap<u64, HardState>,
2541        desc: &SnapshotDesc,
2542    ) {
2543        if received {
2544            self.reset_status_by_snapshot(hard_states, desc);
2545        }
2546        self.bcast_snapshot_finished(received, desc);
2547        self.snapshot_state = SnapshotState::None;
2548    }
2549
2550    pub fn finish_checkpoint(&mut self) {
2551        self.snapshot_state = SnapshotState::None;
2552    }
2553
2554    pub fn is_sending_snapshot(&self) -> bool {
2555        self.snapshot_receving_states
2556            .is_any_remote_receiving_snapshot()
2557    }
2558
2559    pub fn leader_id(&self) -> Option<u64> {
2560        let leader = self.get_index_channel().leader_id;
2561        if leader == INVALID_NODE_ID {
2562            None
2563        } else {
2564            Some(leader)
2565        }
2566    }
2567}
2568
2569#[cfg(test)]
2570mod tests {
2571    use super::*;
2572
2573    use std::collections::HashMap;
2574    use std::iter::FromIterator;
2575
2576    use log::{Metadata, Record};
2577
2578    struct SimpleLogger;
2579    impl log::Log for SimpleLogger {
2580        fn enabled(&self, _metadata: &Metadata) -> bool {
2581            true
2582        }
2583
2584        fn log(&self, record: &Record) {
2585            println!(
2586                "[{} - {} - {}:{}] {}",
2587                record.level(),
2588                record.target(),
2589                record.file().unwrap_or("unknown"),
2590                record.line().unwrap_or(0),
2591                record.args()
2592            );
2593        }
2594
2595        fn flush(&self) {}
2596    }
2597
2598    static LOGGER: SimpleLogger = SimpleLogger;
2599
2600    struct LogMeta {
2601        pub first_index_id: u64,
2602        pub last_index_id: u64,
2603        pub hard_state_map: HashMap<u64, HardState>,
2604        pub members: HashMap<u64, MemberState>,
2605    }
2606
2607    impl Default for LogMeta {
2608        fn default() -> Self {
2609            LogMeta {
2610                first_index_id: INVALID_ID,
2611                last_index_id: INVALID_ID,
2612                hard_state_map: HashMap::new(),
2613                members: HashMap::new(),
2614            }
2615        }
2616    }
2617
2618    impl LogMetaView for LogMeta {
2619        fn membership(&self) -> HashMap<u64, MemberState> {
2620            self.members.clone()
2621        }
2622
2623        fn hard_states(&self) -> HashMap<u64, HardState> {
2624            self.hard_state_map.clone()
2625        }
2626
2627        fn range_of(&self, _channel_id: u64) -> (u64, u64) {
2628            (self.first_index_id, self.last_index_id)
2629        }
2630
2631        fn latest_snapshot(&self) -> Option<SnapshotDesc> {
2632            None
2633        }
2634    }
2635
2636    static SETUP_LOGGER: std::sync::Once = std::sync::Once::new();
2637    fn init_sdcons(log_meta: &LogMeta) -> Sdcons {
2638        let local_id = 1;
2639        let mut mem_store = MemStorage::new(local_id);
2640        for (id, _) in &log_meta.members {
2641            mem_store.append_entries(*id, vec![]);
2642        }
2643        init_sdcons_with_mem_store(log_meta, mem_store)
2644    }
2645
2646    fn init_sdcons_with_mem_store(log_meta: &LogMeta, mem_store: MemStorage) -> Sdcons {
2647        SETUP_LOGGER.call_once(|| {
2648            log::set_logger(&LOGGER)
2649                .map(|()| log::set_max_level(log::LevelFilter::Trace))
2650                .expect("init logger");
2651        });
2652
2653        let local_id = 1;
2654        let opt = SdconsOption::default();
2655        let hard_states = log_meta.hard_states();
2656        let membership = log_meta.membership();
2657        Sdcons::new(local_id, INVALID_ID, opt, mem_store, &membership, &hard_states)
2658    }
2659
2660    fn task(value: u8) -> Task {
2661        Task {
2662            message: vec![value],
2663            context: None,
2664        }
2665    }
2666
2667    fn wait_timeout(sd: &mut Sdcons) {
2668        info!("wait node {} election timeout", sd.id);
2669        let timeout_ticks = sd.option.base_election_timeout_tick * 2;
2670        for _ in 0..timeout_ticks {
2671            sd.tick();
2672        }
2673    }
2674
2675    fn new_entry(member_id: u64, entry_id: u64, term: u64) -> Entry {
2676        Entry {
2677            entry_id,
2678            index_id: INVALID_ID,
2679            request_id: 2,
2680            channel_id: member_id,
2681            channel_term: term,
2682            message: vec![0, 1, 2, 3],
2683            context: None,
2684            configs: None,
2685        }
2686    }
2687
2688    fn new_index_with_id(member_id: u64, index_id: u64, entry_id: u64, term: u64) -> LogIndex {
2689        LogIndex {
2690            channel_id: member_id,
2691            entry_id,
2692            term,
2693            index_id,
2694            context: None,
2695        }
2696    }
2697
2698    fn accept_prepare(ready: &mut Ready, s: &mut Sdcons, entries_map: HashMap<u64, Vec<Entry>>) {
2699        info!("accept prepare msgs of node {}", s.id);
2700        let msgs = ready
2701            .msgs
2702            .iter()
2703            .map(|(_id, msgs)| msgs.iter())
2704            .flatten()
2705            .collect::<Vec<_>>();
2706        for msg in msgs {
2707            let mut reply = Message {
2708                from: msg.to,
2709                to: msg.from,
2710                index_term: msg.index_term,
2711                channel_id: msg.channel_id,
2712                channel_term: msg.channel_term,
2713                detail: MsgDetail::None,
2714            };
2715            if let MsgDetail::Prepare(p) = &msg.detail {
2716                let mut entries = entries_map.get(&msg.to).unwrap_or(&vec![]).clone();
2717                for entry in &mut entries {
2718                    entry.channel_id = msg.channel_id;
2719                }
2720                reply.detail = MsgDetail::PrepareReply(PrepareReplyMsg {
2721                    reject: false,
2722                    learn: p.learn,
2723                    entry_metas: entries.iter().map(|e| EntryMeta::from(e)).collect(),
2724                });
2725                s.step(reply);
2726            };
2727        }
2728    }
2729
2730    fn accept_election(ready: &mut Ready, s: &mut Sdcons) {
2731        info!("accept all election msgs of node {}", s.id);
2732        let msgs = ready
2733            .msgs
2734            .iter()
2735            .map(|(_id, msgs)| msgs.iter())
2736            .flatten()
2737            .collect::<Vec<_>>();
2738        for msg in &msgs {
2739            let mut reply = Message {
2740                from: msg.to,
2741                to: msg.from,
2742                index_term: msg.index_term,
2743                channel_id: msg.channel_id,
2744                channel_term: msg.channel_term,
2745                detail: MsgDetail::None,
2746            };
2747            match &msg.detail {
2748                MsgDetail::Prepare(p) => {
2749                    reply.detail = MsgDetail::PrepareReply(PrepareReplyMsg {
2750                        reject: false,
2751                        learn: p.learn,
2752                        entry_metas: Vec::new(),
2753                    });
2754                    s.step(reply);
2755                }
2756                MsgDetail::Vote(_) => {
2757                    reply.detail = MsgDetail::VoteReply(VoteReplyMsg { reject: false });
2758                    s.step(reply);
2759                }
2760                _ => {}
2761            }
2762        }
2763    }
2764
2765    #[test]
2766    fn single_member_election() {
2767        let mut meta = LogMeta::default();
2768        meta.members.insert(1, MemberState::default());
2769        let mut s = init_sdcons(&meta);
2770
2771        wait_timeout(&mut s);
2772        assert_eq!(s.is_index_leader(), true);
2773        assert_eq!(s.is_local_channel_leader(), true);
2774    }
2775
2776    #[test]
2777    fn multi_member_election() {
2778        let mut meta = LogMeta::default();
2779        meta.members.insert(1, MemberState::default());
2780        meta.members.insert(2, MemberState::default());
2781        meta.members.insert(3, MemberState::default());
2782        let mut s = init_sdcons(&meta);
2783
2784        wait_timeout(&mut s);
2785        assert_eq!(s.is_channel_candidate(s.id), true);
2786        assert_eq!(s.is_channel_candidate(INDEX_CHANNEL_ID), true);
2787
2788        let mut ready = s.advance(&meta);
2789        accept_election(&mut ready, &mut s);
2790
2791        assert_eq!(s.is_local_channel_leader(), true);
2792        assert_eq!(s.is_index_student(), true);
2793
2794        let mut ready = s.advance(&meta);
2795        accept_election(&mut ready, &mut s);
2796        assert_eq!(s.is_index_leader(), true);
2797    }
2798
2799    #[test]
2800    fn learn_refreshed_entries() {
2801        let mut meta = LogMeta::default();
2802        meta.members.insert(1, MemberState::default());
2803        meta.members.insert(2, MemberState::default());
2804        meta.members.insert(3, MemberState::default());
2805        meta.members.insert(4, MemberState::default());
2806        meta.members.insert(5, MemberState::default());
2807        meta.hard_state_map.insert(
2808            1,
2809            HardState {
2810                voted_for: INVALID_NODE_ID,
2811                current_term: 10,
2812            },
2813        );
2814        let mut s = init_sdcons(&meta);
2815
2816        wait_timeout(&mut s);
2817        assert_eq!(s.is_channel_candidate(s.id), true);
2818        assert_eq!(s.is_channel_candidate(INDEX_CHANNEL_ID), true);
2819
2820        let mut ready = s.advance(&meta);
2821        accept_election(&mut ready, &mut s);
2822
2823        assert_eq!(s.is_local_channel_leader(), true);
2824        assert_eq!(s.is_index_student(), true);
2825
2826        let mut ready = s.advance(&meta);
2827        let mut entries_map = HashMap::new();
2828        entries_map.insert(
2829            2,
2830            vec![
2831                new_entry(1, 1, 3),
2832                new_entry(1, 2, 3),
2833                new_entry(1, 3, 3),
2834                new_entry(1, 4, 4),
2835            ],
2836        );
2837        entries_map.insert(3, vec![new_entry(1, 1, 9)]);
2838        entries_map.insert(4, vec![new_entry(1, 1, 9)]);
2839        entries_map.insert(5, vec![new_entry(1, 1, 9)]);
2840        accept_prepare(&mut ready, &mut s, entries_map);
2841
2842        assert_eq!(s.is_index_leader(), true);
2843        let c = s
2844            .channels
2845            .get(&INDEX_CHANNEL_ID)
2846            .unwrap()
2847            .learned_voters
2848            .iter()
2849            .next()
2850            .unwrap();
2851        let channel_info = s.channels.get(c).unwrap();
2852        let entry_meta = channel_info.last_learned_entry_meta();
2853        assert_eq!(entry_meta.id, 1);
2854        assert_eq!(entry_meta.term, 9);
2855    }
2856
2857    #[test]
2858    fn assign_indexes_to_entries_recieved_in_electing() {
2859        let meta = init_log_meta_with_default_members();
2860        let mut s = init_sdcons(&meta);
2861
2862        wait_timeout(&mut s);
2863        assert_eq!(s.is_channel_candidate(s.id), true);
2864        assert_eq!(s.is_channel_candidate(INDEX_CHANNEL_ID), true);
2865
2866        let mut ready = s.advance(&meta);
2867        accept_election(&mut ready, &mut s);
2868
2869        assert_eq!(s.is_local_channel_leader(), true);
2870        assert_eq!(s.is_index_student(), true);
2871
2872        // receive new entries
2873        let requests = vec![1, 2, 3];
2874        submit_tasks(&mut s, &requests);
2875
2876        let mut ready = s.advance(&meta);
2877        accept_prepare(&mut ready, &mut s, HashMap::new());
2878
2879        s.enter_replicate_state();
2880
2881        let ready = s.advance(&meta);
2882        accept_append_entries(&ready, &mut s, &HashSet::new());
2883        accept_indexes(&ready, &mut s, &HashSet::new());
2884        stable_all_entries(&ready, &mut s);
2885        stable_indexes(&ready, &mut s);
2886
2887        s.enter_replicate_state();
2888
2889        let ready = s.advance(&meta);
2890        assert_chosen_entries(&ready, &requests);
2891    }
2892
2893    #[test]
2894    fn handle_vote_request() {
2895        let mut meta = LogMeta::default();
2896        meta.members.insert(1, MemberState::default());
2897        meta.members.insert(2, MemberState::default());
2898        meta.members.insert(3, MemberState::default());
2899        meta.hard_state_map.insert(
2900            INDEX_CHANNEL_ID,
2901            HardState {
2902                voted_for: INVALID_NODE_ID,
2903                current_term: 10,
2904            },
2905        );
2906
2907        let mut mem_store = MemStorage::new(1);
2908        mem_store.append_entries(1, vec![]);
2909        mem_store.append_entries(2, vec![]);
2910        mem_store.append_entries(3, vec![]);
2911        mem_store
2912            .extend_indexes(vec![new_index_with_id(1, 1, 1, 2), new_index_with_id(1, 2, 3, 4)]);
2913        let mut s = init_sdcons_with_mem_store(&meta, mem_store);
2914        assert_eq!(s.get_index_term(), 10);
2915        assert_eq!(s.get_index_channel().voted_for, INVALID_NODE_ID);
2916
2917        // reject staled term
2918        let mut msg = Message {
2919            from: 2,
2920            to: 1,
2921            index_term: 1,
2922            channel_id: INDEX_CHANNEL_ID,
2923            channel_term: 10,
2924            detail: MsgDetail::Vote(VoteMsg {
2925                last_index_id: 0,
2926                last_index_term: 0,
2927            }),
2928        };
2929        s.step(msg.clone());
2930        assert_eq!(s.get_index_channel().voted_for, INVALID_NODE_ID);
2931
2932        // isn't refresh
2933        msg.index_term = 10;
2934        s.step(msg.clone());
2935        assert_eq!(s.get_index_channel().voted_for, INVALID_NODE_ID);
2936
2937        // receive
2938        msg.detail = MsgDetail::Vote(VoteMsg {
2939            last_index_id: 10,
2940            last_index_term: 10,
2941        });
2942        s.step(msg.clone());
2943        assert_eq!(s.get_index_channel().voted_for, 2);
2944
2945        // already promised.
2946        msg.from = 3;
2947        msg.detail = MsgDetail::Vote(VoteMsg {
2948            last_index_id: 11,
2949            last_index_term: 11,
2950        });
2951        s.step(msg.clone());
2952        assert_eq!(s.get_index_channel().voted_for, 2);
2953    }
2954
2955    #[test]
2956    fn handle_prepare_req() {
2957        let mut meta = LogMeta::default();
2958        meta.members.insert(1, MemberState::default());
2959        meta.members.insert(2, MemberState::default());
2960        meta.members.insert(3, MemberState::default());
2961        meta.hard_state_map.insert(
2962            1,
2963            HardState {
2964                voted_for: INVALID_NODE_ID,
2965                current_term: 10,
2966            },
2967        );
2968
2969        let mut mem_store = MemStorage::new(1);
2970        mem_store.append_entries(1, vec![]);
2971        mem_store.append_entries(2, vec![]);
2972        mem_store.append_entries(3, vec![]);
2973        mem_store
2974            .extend_indexes(vec![new_index_with_id(1, 1, 1, 2), new_index_with_id(1, 2, 3, 4)]);
2975        let mut s = init_sdcons_with_mem_store(&meta, mem_store);
2976        assert_eq!(s.channels.get(&1).unwrap().term, 10);
2977        assert_eq!(s.channels.get(&1).unwrap().voted_for, INVALID_NODE_ID);
2978
2979        // receive
2980        let mut msg = Message {
2981            from: 2,
2982            to: 1,
2983            index_term: 1,
2984            channel_id: 1,
2985            channel_term: 10,
2986            detail: MsgDetail::Prepare(PrepareMsg {
2987                learn: false,
2988                committed_entry_id: 0,
2989            }),
2990        };
2991        s.step(msg.clone());
2992        assert_eq!(s.channels.get(&1).unwrap().voted_for, 2);
2993
2994        // receive too.
2995        s.step(msg.clone());
2996        assert_eq!(s.channels.get(&1).unwrap().voted_for, 2);
2997
2998        // already promised.
2999        msg.from = 3;
3000        s.step(msg.clone());
3001        assert_eq!(s.channels.get(&1).unwrap().voted_for, 2);
3002    }
3003
3004    #[test]
3005    fn to_follower_when_receive_high_term() {
3006        let channels = vec![INDEX_CHANNEL_ID, 1];
3007        let roles = vec![Role::Follower, Role::Leader, Role::Student, Role::Candidate];
3008        for channel_id in channels {
3009            for role in &roles {
3010                debug!("execute role {} channel id {}", role, channel_id);
3011
3012                let mut meta = LogMeta::default();
3013                meta.members.insert(1, MemberState::default());
3014                meta.members.insert(2, MemberState::default());
3015                meta.members.insert(3, MemberState::default());
3016                let mut s = init_sdcons(&meta);
3017                match role {
3018                    Role::Follower => {}
3019                    Role::Student | Role::Leader => {
3020                        wait_timeout(&mut s);
3021                        let mut ready = s.advance(&meta);
3022                        accept_election(&mut ready, &mut s);
3023                    }
3024                    Role::Candidate => {
3025                        wait_timeout(&mut s);
3026                    }
3027                }
3028
3029                let index_term = if channel_id == INDEX_CHANNEL_ID {
3030                    10
3031                } else {
3032                    1
3033                };
3034
3035                let msg = Message {
3036                    from: 2,
3037                    to: 1,
3038                    index_term,
3039                    channel_id: channel_id,
3040                    channel_term: 10,
3041                    detail: MsgDetail::None,
3042                };
3043                s.step(msg.clone());
3044
3045                assert_eq!(s.is_channel_follower(channel_id), true);
3046                assert_eq!(s.get_channel_term(channel_id), 10);
3047            }
3048        }
3049    }
3050
3051    #[test]
3052    fn to_follower_when_someone_declare_leader() {
3053        let mut meta = LogMeta::default();
3054        meta.members.insert(1, MemberState::default());
3055        meta.members.insert(2, MemberState::default());
3056        meta.members.insert(3, MemberState::default());
3057        let mut s = init_sdcons(&meta);
3058        wait_timeout(&mut s);
3059        let channel_term = s.get_channel_term(INDEX_CHANNEL_ID);
3060        let msg = Message {
3061            from: 2,
3062            to: 1,
3063            index_term: channel_term,
3064            channel_id: INDEX_CHANNEL_ID,
3065            channel_term: channel_term,
3066            detail: MsgDetail::Declare(DeclareMsg { committed_id: 0 }),
3067        };
3068        s.step(msg.clone());
3069
3070        assert_eq!(s.is_channel_follower(INDEX_CHANNEL_ID), true);
3071        assert_eq!(s.get_channel_term(INDEX_CHANNEL_ID), channel_term);
3072    }
3073
3074    fn init_log_meta_with_members(members: &Vec<u64>) -> LogMeta {
3075        let mut meta = LogMeta::default();
3076        for id in members {
3077            meta.members.insert(*id, MemberState::default());
3078        }
3079        meta
3080    }
3081
3082    fn init_log_meta_with_default_members() -> LogMeta {
3083        init_log_meta_with_members(&vec![1, 2, 3])
3084    }
3085
3086    #[test]
3087    fn bcast_indexes() {
3088        let meta = init_log_meta_with_default_members();
3089        let mut s = init_sdcons(&meta);
3090        wait_timeout(&mut s);
3091        let mut ready = s.advance(&meta);
3092        accept_election(&mut ready, &mut s);
3093        let mut ready = s.advance(&meta);
3094        accept_prepare(&mut ready, &mut s, HashMap::new());
3095        assert_eq!(s.is_index_leader(), true);
3096        let ready = s.advance(&meta);
3097        assert_eq!(ready.unstable_indexes.len() > 0, true);
3098    }
3099
3100    fn submit_tasks(s: &mut Sdcons, requests: &Vec<u64>) {
3101        for req_id in requests {
3102            s.submit_task(
3103                *req_id + FIRST_USER_REQUEST,
3104                Task {
3105                    message: vec![0, 1],
3106                    context: None,
3107                },
3108            )
3109            .expect("submit task");
3110        }
3111    }
3112
3113    fn stable_all_entries(ready: &Ready, s: &mut Sdcons) {
3114        let mut map = HashMap::new();
3115        for entry in &ready.unstable_entries {
3116            let v = map.entry(entry.channel_id).or_default();
3117            *v = entry.entry_id;
3118        }
3119        for (channel_id, v) in map {
3120            s.submit_stable_result(channel_id, 0, v);
3121        }
3122    }
3123
3124    fn stable_indexes(ready: &Ready, s: &mut Sdcons) {
3125        if ready.unstable_indexes.is_empty() {
3126            return;
3127        }
3128        let first_id = ready
3129            .unstable_indexes
3130            .first()
3131            .map(|i| i.index_id)
3132            .unwrap_or(INVALID_ID);
3133        let last_id = ready
3134            .unstable_indexes
3135            .last()
3136            .map(|i| i.index_id)
3137            .unwrap_or(INVALID_ID);
3138        s.submit_stable_result(INDEX_CHANNEL_ID, first_id, last_id);
3139    }
3140
3141    fn apply_entries(ready: &Ready, s: &mut Sdcons) {
3142        s.submit_apply_result(ready.first_apply_index_id, ready.last_apply_index_id);
3143    }
3144
3145    fn assert_chosen_entries(ready: &Ready, requests: &Vec<u64>) {
3146        let chosen_requests = ready
3147            .chosen_entries
3148            .iter()
3149            .map(|e| e.request_id)
3150            .collect::<Vec<_>>();
3151        debug!("chosen entries {:?}", chosen_requests);
3152        for request_id in requests {
3153            let count = chosen_requests
3154                .iter()
3155                .filter(|id| **id == (request_id + FIRST_USER_REQUEST))
3156                .count();
3157            assert_eq!(count, 1);
3158        }
3159    }
3160
3161    fn assert_chosen_config_change_entries(ready: &Ready) {
3162        let chosen_requests = ready
3163            .chosen_entries
3164            .iter()
3165            .map(|e| e.request_id)
3166            .collect::<Vec<_>>();
3167        debug!("chosen entries {:?}", chosen_requests);
3168
3169        let count = chosen_requests
3170            .iter()
3171            .filter(|id| **id == CONFIG_CHANGE_ID)
3172            .count();
3173        assert_eq!(count, 1);
3174    }
3175
3176    fn assert_replicate_entries(ready: &Ready, target_id: u64, requests: &Vec<u64>) {
3177        let msgs = ready
3178            .msgs
3179            .iter()
3180            .map(|(_id, msgs)| msgs.iter())
3181            .flatten()
3182            .collect::<Vec<_>>();
3183        let replicate_requests = msgs
3184            .iter()
3185            .filter_map(|m| {
3186                if m.to != target_id {
3187                    None
3188                } else if let MsgDetail::Append(p) = &m.detail {
3189                    Some(p.entries.clone())
3190                } else {
3191                    None
3192                }
3193            })
3194            .flatten()
3195            .map(|e| e.request_id)
3196            .collect::<HashSet<_>>();
3197
3198        debug!("replicate entries {:?}", replicate_requests);
3199        for request_id in requests {
3200            let actual_request_id = request_id + FIRST_USER_REQUEST;
3201            assert_eq!(replicate_requests.contains(&actual_request_id), true);
3202        }
3203    }
3204
3205    #[test]
3206    fn single_member_choose() {
3207        let mut meta = LogMeta::default();
3208        meta.members.insert(1, MemberState::default());
3209        let mut s = init_sdcons(&meta);
3210        wait_timeout(&mut s);
3211        assert_eq!(s.is_index_leader(), true);
3212        assert_eq!(s.is_channel_leader(1), true);
3213
3214        let inputs = vec![vec![1, 2, 3], vec![4, 5, 6]];
3215        for requests in &inputs {
3216            submit_tasks(&mut s, requests);
3217            let ready = s.advance(&meta);
3218            assert_eq!(ready.chosen_entries.is_empty(), true);
3219            stable_all_entries(&ready, &mut s);
3220            stable_indexes(&ready, &mut s);
3221
3222            let ready = s.advance(&meta);
3223            assert_chosen_entries(&ready, requests);
3224            apply_entries(&ready, &mut s);
3225        }
3226    }
3227
3228    fn to_leader(s: &mut Sdcons, meta: &LogMeta) {
3229        if s.channels.len() == 2 {
3230            wait_timeout(s);
3231            assert_eq!(s.is_index_leader(), true);
3232            assert_eq!(s.get_index_channel().leader_id, 1);
3233            assert_eq!(s.is_channel_leader(1), true);
3234        } else {
3235            wait_timeout(s);
3236            let mut ready = s.advance(meta);
3237            accept_election(&mut ready, s);
3238            assert_eq!(s.is_index_student(), true);
3239            assert_eq!(s.get_index_channel().leader_id, 1);
3240            assert_eq!(s.is_local_channel_leader(), true);
3241            let mut ready = s.advance(meta);
3242            let map = HashMap::new();
3243            accept_prepare(&mut ready, s, map);
3244            assert_eq!(s.is_index_leader(), true);
3245        }
3246    }
3247
3248    #[test]
3249    fn replicate_entries() {
3250        let meta = init_log_meta_with_default_members();
3251        let mut s = init_sdcons(&meta);
3252        to_leader(&mut s, &meta);
3253        s.enter_replicate_state();
3254
3255        let entries = vec![1, 2, 3];
3256        submit_tasks(&mut s, &entries);
3257        let ready = s.advance(&meta);
3258        assert_replicate_entries(&ready, 2, &entries);
3259        assert_replicate_entries(&ready, 3, &entries);
3260
3261        // replicate next entries.
3262        let entries = vec![4, 5, 6];
3263        submit_tasks(&mut s, &entries);
3264        let ready = s.advance(&meta);
3265        assert_replicate_entries(&ready, 2, &entries);
3266        assert_replicate_entries(&ready, 3, &entries);
3267
3268        // replicate next one entry
3269        let entries = vec![7];
3270        submit_tasks(&mut s, &entries);
3271        let ready = s.advance(&meta);
3272        assert_replicate_entries(&ready, 2, &entries);
3273        assert_replicate_entries(&ready, 3, &entries);
3274
3275        // replicate empty entries.
3276        let ready = s.advance(&meta);
3277        assert_eq!(ready.msgs.len(), 0);
3278
3279        // reject and re-replicate entries.
3280        let index_term = s.get_index_term();
3281        let channel_term = s.get_channel_term(1);
3282        let msg = Message {
3283            from: 2,
3284            to: 1,
3285            index_term: index_term,
3286            channel_id: 1,
3287            channel_term: channel_term,
3288            detail: MsgDetail::AppendReply(AppendReplyMsg {
3289                reject: true,
3290                entry_id: 1,
3291                hint_id: 1,
3292            }),
3293        };
3294        s.step(msg.clone());
3295        let ready = s.advance(&meta);
3296        assert_replicate_entries(&ready, 2, &vec![1, 2, 3, 4, 5, 6, 7]);
3297    }
3298
3299    fn declare_leader(s: &mut Sdcons, index_term: u64, channel_id: u64, channel_term: u64) {
3300        let msg = Message {
3301            from: 2,
3302            to: 1,
3303            index_term,
3304            channel_id,
3305            channel_term,
3306            detail: MsgDetail::Declare(DeclareMsg {
3307                committed_id: INVALID_ID,
3308            }),
3309        };
3310        s.step(msg);
3311    }
3312
3313    fn declare_with_id(
3314        s: &mut Sdcons,
3315        from: u64,
3316        index_term: u64,
3317        channel_id: u64,
3318        channel_term: u64,
3319        committed_id: u64,
3320    ) {
3321        let msg = Message {
3322            from,
3323            to: s.id,
3324            index_term,
3325            channel_id,
3326            channel_term,
3327            detail: MsgDetail::Declare(DeclareMsg { committed_id }),
3328        };
3329        s.step(msg);
3330    }
3331
3332    fn assert_append_reply(ready: &Ready, to: u64, entry_id: u64, reject: bool) {
3333        let msgs = ready
3334            .msgs
3335            .iter()
3336            .map(|(_id, msgs)| msgs.iter())
3337            .flatten()
3338            .collect::<Vec<_>>();
3339        let replies = msgs
3340            .iter()
3341            .filter_map(|m| {
3342                if m.to != to {
3343                    None
3344                } else if let MsgDetail::AppendReply(p) = &m.detail {
3345                    Some(p.clone())
3346                } else {
3347                    None
3348                }
3349            })
3350            .collect::<Vec<_>>();
3351
3352        assert_eq!(replies.len(), 1);
3353        let reply = &replies[0];
3354        assert_eq!(reply.reject, reject);
3355        assert_eq!(reply.hint_id, entry_id);
3356    }
3357
3358    fn append_specified_entries(
3359        s: &mut Sdcons,
3360        channel_id: u64,
3361        channel_term: u64,
3362        prev_id: u64,
3363        prev_term: u64,
3364        entries: Vec<Entry>,
3365    ) {
3366        append_specified_entries_with_id(
3367            s,
3368            channel_id,
3369            channel_term,
3370            prev_id,
3371            prev_term,
3372            0,
3373            entries,
3374        );
3375    }
3376
3377    fn append_specified_entries_with_id(
3378        s: &mut Sdcons,
3379        channel_id: u64,
3380        channel_term: u64,
3381        prev_id: u64,
3382        prev_term: u64,
3383        committed_entry_id: u64,
3384        entries: Vec<Entry>,
3385    ) {
3386        let msg = Message {
3387            from: 2,
3388            to: 1,
3389            index_term: s.get_index_term(),
3390            channel_id,
3391            channel_term,
3392            detail: MsgDetail::Append(AppendMsg {
3393                committed_entry_id,
3394                prev_entry_id: prev_id,
3395                prev_entry_term: prev_term,
3396                entries,
3397            }),
3398        };
3399        s.step(msg);
3400    }
3401
3402    fn append_entries(
3403        s: &mut Sdcons,
3404        channel_id: u64,
3405        channel_term: u64,
3406        prev_id: u64,
3407        prev_term: u64,
3408        requests: &Vec<u64>,
3409    ) {
3410        append_entries_with_id(s, channel_id, channel_term, prev_id, prev_term, 0, requests);
3411    }
3412
3413    fn append_entries_with_id(
3414        s: &mut Sdcons,
3415        channel_id: u64,
3416        channel_term: u64,
3417        prev_id: u64,
3418        prev_term: u64,
3419        committed_entry_id: u64,
3420        requests: &Vec<u64>,
3421    ) {
3422        let mut entries = Vec::new();
3423        let mut id = prev_id + 1;
3424        for request_id in requests {
3425            entries.push(Entry {
3426                request_id: *request_id + FIRST_USER_REQUEST,
3427                channel_id,
3428                channel_term,
3429                entry_id: id,
3430                index_id: INVALID_ID,
3431                message: vec![1, 2, 3],
3432                context: None,
3433                configs: None,
3434            });
3435            id += 1;
3436        }
3437
3438        debug!("append entries {:?} with term {}", requests, channel_term);
3439        append_specified_entries_with_id(
3440            s,
3441            channel_id,
3442            channel_term,
3443            prev_id,
3444            prev_term,
3445            committed_entry_id,
3446            entries,
3447        );
3448    }
3449
3450    #[test]
3451    fn receive_entries() {
3452        let meta = init_log_meta_with_default_members();
3453        let mut s = init_sdcons(&meta);
3454        let index_term = 10;
3455        let channel_id = 2;
3456        let channel_term = 11;
3457        declare_leader(&mut s, index_term, channel_id, channel_term);
3458
3459        // reject large append.
3460        append_entries(&mut s, channel_id, channel_term, 10, 2, &vec![1, 2, 3]);
3461        let ready = s.advance(&meta);
3462        assert_append_reply(&ready, channel_id, 1, true);
3463
3464        append_entries(&mut s, channel_id, channel_term, 0, 0, &vec![1, 2, 3]);
3465        // current entries is [1, 2, 3]
3466        let ready = s.advance(&meta);
3467        assert_append_reply(&ready, channel_id, 4, false);
3468
3469        // truncate and accept
3470        append_entries(&mut s, channel_id, channel_term + 1, 2, channel_term, &vec![3, 4, 5]);
3471        // current entries is [1, 2, 3, 4, 5]
3472        let ready = s.advance(&meta);
3473        assert_append_reply(&ready, channel_id, 6, false);
3474
3475        // find conflict id
3476        append_entries(&mut s, channel_id, channel_term + 2, 5, channel_term, &vec![5, 6, 7]);
3477        let ready = s.advance(&meta);
3478        assert_append_reply(&ready, channel_id, 3, true);
3479
3480        // reject staled request
3481        append_entries_with_id(
3482            &mut s,
3483            channel_id,
3484            channel_term + 2,
3485            5,
3486            channel_term + 1,
3487            5,
3488            &vec![6, 7, 8],
3489        );
3490        let ready = s.advance(&meta);
3491        assert_append_reply(&ready, channel_id, 9, false);
3492
3493        append_entries(&mut s, channel_id, channel_term + 2, 3, channel_term, &vec![4, 5]);
3494        let ready = s.advance(&meta);
3495        assert_append_reply(&ready, channel_id, 6, false);
3496    }
3497
3498    fn accept_append_entries(ready: &Ready, s: &mut Sdcons, skip_nodes: &HashSet<u64>) {
3499        let msgs = ready
3500            .msgs
3501            .iter()
3502            .map(|(_id, msgs)| msgs.iter())
3503            .flatten()
3504            .collect::<Vec<_>>();
3505        for msg in msgs {
3506            if let MsgDetail::Append(p) = &msg.detail {
3507                match p.entries.last() {
3508                    Some(e) => {
3509                        if skip_nodes.contains(&msg.to) {
3510                            debug!("skip append entries to node {}", msg.to);
3511                            continue;
3512                        }
3513                        let reply = Message {
3514                            from: msg.to,
3515                            to: msg.from,
3516                            index_term: msg.index_term,
3517                            channel_id: msg.channel_id,
3518                            channel_term: msg.channel_term,
3519                            detail: MsgDetail::AppendReply(AppendReplyMsg {
3520                                entry_id: p.prev_entry_id + 1u64,
3521                                hint_id: e.entry_id + 1u64,
3522                                reject: false,
3523                            }),
3524                        };
3525                        s.step(reply);
3526                    }
3527                    _ => {}
3528                }
3529            }
3530        }
3531    }
3532
3533    fn accept_indexes(ready: &Ready, s: &mut Sdcons, skip_nodes: &HashSet<u64>) {
3534        let msgs = ready
3535            .msgs
3536            .iter()
3537            .map(|(_id, msgs)| msgs.iter())
3538            .flatten()
3539            .collect::<Vec<_>>();
3540        for msg in &msgs {
3541            if let MsgDetail::Index(p) = &msg.detail {
3542                match p.indexes.last() {
3543                    Some(i) => {
3544                        if skip_nodes.contains(&msg.to) {
3545                            debug!("skip append entries to node {}", msg.to);
3546                            continue;
3547                        }
3548                        let reply = Message {
3549                            from: msg.to,
3550                            to: msg.from,
3551                            index_term: msg.index_term,
3552                            channel_id: msg.channel_id,
3553                            channel_term: msg.channel_term,
3554                            detail: MsgDetail::IndexReply(IndexReplyMsg {
3555                                index_id: p.prev_index_id,
3556                                hint_id: i.index_id + 1u64,
3557                                reject: false,
3558                            }),
3559                        };
3560                        s.step(reply);
3561                    }
3562                    _ => {}
3563                }
3564            }
3565        }
3566    }
3567
3568    #[test]
3569    fn multi_member_chosen() {
3570        let meta = init_log_meta_with_default_members();
3571        let mut s = init_sdcons(&meta);
3572        to_leader(&mut s, &meta);
3573        s.enter_replicate_state();
3574
3575        let requests = vec![1];
3576        submit_tasks(&mut s, &requests);
3577
3578        let mut skip_nodes = HashSet::new();
3579
3580        let ready = s.advance(&meta);
3581        assert_eq!(ready.chosen_entries.len(), 0);
3582        accept_append_entries(&ready, &mut s, &skip_nodes);
3583        accept_indexes(&ready, &mut s, &skip_nodes);
3584
3585        // won't apply if entries not stabled.
3586        let ready = s.advance(&meta);
3587        assert_eq!(ready.chosen_entries.len(), 0);
3588
3589        // only stable entries
3590        stable_all_entries(&ready, &mut s);
3591        let ready = s.advance(&meta);
3592        assert_eq!(ready.chosen_entries.len(), 0);
3593
3594        stable_indexes(&ready, &mut s);
3595        let ready = s.advance(&meta);
3596        assert_chosen_entries(&ready, &requests);
3597        s.submit_apply_result(ready.first_apply_index_id, ready.last_apply_index_id);
3598
3599        let ready = s.advance(&meta);
3600        assert_eq!(ready.chosen_entries.len(), 0);
3601
3602        // only 3 response
3603        skip_nodes.insert(2);
3604        let requests = vec![2, 3, 4];
3605        submit_tasks(&mut s, &requests);
3606
3607        let ready = s.advance(&meta);
3608        accept_append_entries(&ready, &mut s, &skip_nodes);
3609        accept_indexes(&ready, &mut s, &skip_nodes);
3610
3611        // won't apply if entries not stabled.
3612        let ready = s.advance(&meta);
3613        assert_eq!(ready.chosen_entries.len(), 0);
3614
3615        stable_all_entries(&ready, &mut s);
3616        stable_indexes(&ready, &mut s);
3617
3618        let ready = s.advance(&meta);
3619        assert_chosen_entries(&ready, &requests);
3620        s.submit_apply_result(ready.first_apply_index_id, ready.last_apply_index_id);
3621
3622        let ready = s.advance(&meta);
3623        assert_eq!(ready.chosen_entries.len(), 0);
3624    }
3625
3626    #[test]
3627    fn multi_member_receive_chosen() {
3628        let meta = init_log_meta_with_default_members();
3629        let mut s = init_sdcons(&meta);
3630
3631        let requests = vec![1, 2, 3];
3632        let indexes = vec![
3633            new_index_with_id(2, 1, 1, 2),
3634            new_index_with_id(2, 2, 2, 2),
3635            new_index_with_id(2, 3, 3, 2),
3636        ];
3637        append_entries(&mut s, 2, 2, INVALID_ID, INITIAL_TERM, &requests);
3638        replicate_indexes(2, INVALID_ID, INITIAL_TERM, &indexes, &mut s);
3639        declare_with_id(&mut s, 2, 2, 2, 2, 3); // channel 2 commit 3
3640        declare_with_id(&mut s, 2, 2, INDEX_CHANNEL_ID, 2, 3); // channel 0 commit 3
3641
3642        let ready = s.advance(&meta);
3643        assert_eq!(ready.chosen_entries.len(), 0);
3644        stable_all_entries(&ready, &mut s);
3645        stable_indexes(&ready, &mut s);
3646
3647        let ready = s.advance(&meta);
3648        assert_chosen_entries(&ready, &requests);
3649    }
3650
3651    fn replicate_indexes(
3652        from: u64,
3653        prev_id: u64,
3654        prev_term: u64,
3655        indexes: &Vec<LogIndex>,
3656        s: &mut Sdcons,
3657    ) {
3658        let index_term = s.get_index_term();
3659        let msg = Message {
3660            from,
3661            to: s.id,
3662            index_term,
3663            channel_id: INDEX_CHANNEL_ID,
3664            channel_term: index_term,
3665            detail: MsgDetail::Index(IndexMsg {
3666                committed_index_id: 0,
3667                prev_index_id: prev_id,
3668                prev_index_term: prev_term,
3669                indexes: indexes.clone(),
3670            }),
3671        };
3672        s.step(msg);
3673    }
3674
3675    #[test]
3676    fn only_assign_index_once() {
3677        // 1. 5 nodes, 2 is leader
3678        // 2. 1 replicate entries to 2
3679        // 3. 2 assign index and replicate indexes to 3
3680        // 4. now 2 step down, 3 is elected as leader
3681        // 5. 1 replicate entries to 3
3682        // 6. 3 should recongnize the assigned entries.
3683        let meta = init_log_meta_with_members(&vec![1, 2, 3, 4, 5]);
3684        let mut s = init_sdcons(&meta);
3685
3686        // 2 replicate indexes to 1
3687        let indexes = vec![new_index_with_id(2, 1, 1, 1)];
3688        replicate_indexes(2, INVALID_ID, INITIAL_TERM, &indexes, &mut s);
3689
3690        // 1 step to leader
3691        to_leader(&mut s, &meta);
3692
3693        // 2 replicate entries to 1
3694        append_entries(&mut s, 2, 2, INVALID_ID, INITIAL_TERM, &vec![1]);
3695
3696        // 1 apply entries
3697        let ready = s.advance(&meta);
3698        let index_term = s.get_index_term();
3699        stable_all_entries(&ready, &mut s);
3700        stable_indexes(&ready, &mut s);
3701        accept_append_entries(&ready, &mut s, &HashSet::new());
3702        accept_indexes(&ready, &mut s, &HashSet::new());
3703        declare_with_id(&mut s, 2, index_term, 2, 2, 1); // commit entry
3704        assert_eq!(ready.chosen_entries.len(), 0);
3705
3706        debug!("validate advance indexes: {:?}", s.log_buffer.indexes);
3707        let ready = s.advance(&meta);
3708        assert_chosen_entries(&ready, &vec![1]);
3709    }
3710
3711    #[test]
3712    fn enter_both_stage() {
3713        let meta = init_log_meta_with_members(&vec![1]);
3714        let mut s = init_sdcons(&meta);
3715        to_leader(&mut s, &meta);
3716
3717        assert_eq!(s.pending_configs.is_none(), true);
3718
3719        s.change_config(vec![1, 2, 3]).expect("success");
3720        assert_eq!(s.pending_configs.is_some(), true);
3721        assert_eq!(s.pending_configs.as_ref().unwrap().stage, ConfigStage::Old);
3722
3723        s.tick();
3724        s.advance(&meta);
3725        assert_eq!(s.pending_configs.is_some(), true);
3726        assert_eq!(s.pending_configs.as_ref().unwrap().stage, ConfigStage::Both);
3727    }
3728
3729    #[test]
3730    fn committed_should_reach_majority_in_both_stage() {
3731        let meta = init_log_meta_with_members(&vec![1, 2, 3]);
3732        let mut s = init_sdcons(&meta);
3733        to_leader(&mut s, &meta);
3734
3735        assert_eq!(s.pending_configs.is_none(), true);
3736
3737        s.change_config(vec![1, 4, 5]).expect("success");
3738        assert_eq!(s.pending_configs.is_some(), true);
3739        assert_eq!(s.pending_configs.as_ref().unwrap().stage, ConfigStage::Old);
3740
3741        s.tick();
3742        assert_eq!(s.is_channel_candidate(2), true);
3743        assert_eq!(s.is_channel_candidate(3), true);
3744        assert_eq!(s.pending_configs.is_some(), true);
3745        assert_eq!(s.pending_configs.as_ref().unwrap().stage, ConfigStage::Old);
3746
3747        let mut ready = s.advance(&meta);
3748        accept_prepare(&mut ready, &mut s, HashMap::new());
3749        accept_append_entries(&ready, &mut s, &HashSet::new());
3750        accept_indexes(&ready, &mut s, &HashSet::new());
3751        assert_eq!(s.is_channel_leader(2), true);
3752        assert_eq!(s.is_channel_leader(3), true);
3753
3754        s.tick();
3755        assert_eq!(s.pending_configs.is_some(), true);
3756        assert_eq!(s.pending_configs.as_ref().unwrap().stage, ConfigStage::Both);
3757
3758        s.enter_replicate_state();
3759
3760        // try commit entries
3761        let mut skip_nodes = HashSet::new();
3762        skip_nodes.insert(4);
3763        skip_nodes.insert(5);
3764        let ready = s.advance(&meta);
3765        let apply_id = ready.last_apply_index_id;
3766        apply_entries(&ready, &mut s);
3767        stable_all_entries(&ready, &mut s);
3768        stable_indexes(&ready, &mut s);
3769        accept_append_entries(&ready, &mut s, &skip_nodes);
3770        accept_indexes(&ready, &mut s, &skip_nodes);
3771        let ready = s.advance(&meta);
3772        assert_eq!(ready.first_apply_index_id, apply_id + 1);
3773        assert_eq!(ready.chosen_entries.len(), 0);
3774
3775        s.enter_replicate_state();
3776
3777        debug!("both stage recieve marjority response");
3778        s.reset_member_next_id(vec![4, 5]);
3779
3780        let ready = s.advance(&meta);
3781        accept_append_entries(&ready, &mut s, &HashSet::new());
3782        accept_indexes(&ready, &mut s, &HashSet::new());
3783        let ready = s.advance(&meta);
3784        assert_chosen_config_change_entries(&ready);
3785    }
3786
3787    fn to_both_stage(s: &mut Sdcons, log_meta: &LogMeta, new_configs: Vec<u64>) {
3788        to_leader(s, log_meta);
3789        s.change_config(new_configs).expect("success");
3790        s.tick();
3791
3792        // elect as channel leader
3793        let mut ready = s.advance(log_meta);
3794        accept_prepare(&mut ready, s, HashMap::new());
3795        accept_append_entries(&ready, s, &HashSet::new());
3796        accept_indexes(&ready, s, &HashSet::new());
3797
3798        // enter both stage
3799        s.enter_replicate_state();
3800        s.tick();
3801    }
3802
3803    fn advance_all(s: &mut Sdcons, log_meta: &LogMeta, skip_nodes: &HashSet<u64>) {
3804        let ready = s.advance(log_meta);
3805        apply_entries(&ready, s);
3806        stable_all_entries(&ready, s);
3807        stable_indexes(&ready, s);
3808        accept_append_entries(&ready, s, skip_nodes);
3809        accept_indexes(&ready, s, skip_nodes);
3810    }
3811
3812    fn advance_all_and_count_msg<F>(
3813        s: &mut Sdcons,
3814        log_meta: &LogMeta,
3815        skip_nodes: &HashSet<u64>,
3816        is_expect_msg_fn: F,
3817    ) -> usize
3818    where
3819        F: Fn(&Message) -> bool,
3820    {
3821        let ready = s.advance(log_meta);
3822        apply_entries(&ready, s);
3823        stable_all_entries(&ready, s);
3824        stable_indexes(&ready, s);
3825        accept_append_entries(&ready, s, skip_nodes);
3826        accept_indexes(&ready, s, skip_nodes);
3827
3828        ready
3829            .msgs
3830            .iter()
3831            .map(|(_id, msgs)| msgs.iter())
3832            .flatten()
3833            .filter(|msg| is_expect_msg_fn(msg))
3834            .count()
3835    }
3836
3837    fn to_new_stage(s: &mut Sdcons, log_meta: &LogMeta, new_configs: Vec<u64>) {
3838        to_both_stage(s, log_meta, new_configs);
3839        s.enter_replicate_state();
3840        // accept both stage config change
3841        advance_all(s, log_meta, &HashSet::new());
3842        // apply both stage config change
3843        advance_all(s, log_meta, &HashSet::new());
3844
3845        // change to new stage.
3846        s.tick();
3847        assert_eq!(s.is_pending_configs_in_new_stage(), true);
3848    }
3849
3850    #[test]
3851    fn apply_config_change() {
3852        let meta = init_log_meta_with_members(&vec![1, 2, 3]);
3853        let mut s = init_sdcons(&meta);
3854        to_new_stage(&mut s, &meta, vec![1, 4, 5]);
3855
3856        debug!("should apply config changes");
3857
3858        submit_tasks(&mut s, &vec![1, 2, 3]);
3859        let ready = s.advance(&meta);
3860        apply_entries(&ready, &mut s);
3861        stable_all_entries(&ready, &mut s);
3862        stable_indexes(&ready, &mut s);
3863
3864        // New configs reach majority
3865        let skip_nodes: HashSet<u64> = vec![1, 3].iter().cloned().collect();
3866        accept_append_entries(&ready, &mut s, &skip_nodes);
3867        accept_indexes(&ready, &mut s, &skip_nodes);
3868
3869        // Apply pending configs
3870        s.tick();
3871        let ready = s.advance(&meta);
3872        assert_chosen_entries(&ready, &vec![1, 2, 3]);
3873        let _ = s.advance(&meta);
3874        assert_eq!(s.pending_configs.is_none(), true);
3875    }
3876
3877    #[test]
3878    fn rollback_config_change() {
3879        let meta = init_log_meta_with_members(&vec![1, 2, 3]);
3880        let mut s = init_sdcons(&meta);
3881        let next_id = INVALID_ID + 1u64;
3882        let mut channel_id = 2;
3883        let mut entry = Entry {
3884            request_id: CONFIG_CHANGE_ID,
3885            channel_id: channel_id,
3886            channel_term: 1,
3887            entry_id: next_id,
3888            index_id: INVALID_ID,
3889            message: vec![],
3890            context: None,
3891            configs: Some(ChangeConfig {
3892                index_id: next_id,
3893                entry_id: next_id,
3894                term: 1,
3895                stage: ConfigStage::Both,
3896                members: HashSet::from_iter(vec![1, 4, 5].into_iter()),
3897            }),
3898        };
3899        let mut index = LogIndex {
3900            channel_id: channel_id,
3901            entry_id: next_id,
3902            index_id: next_id,
3903            term: 1,
3904            context: None,
3905        };
3906
3907        assert_eq!(s.pending_configs.is_none(), true);
3908        replicate_indexes(channel_id, INVALID_ID, INITIAL_TERM, &vec![index.clone()], &mut s);
3909        append_specified_entries(
3910            &mut s,
3911            channel_id,
3912            1,
3913            INVALID_ID,
3914            INITIAL_TERM,
3915            vec![entry.clone()],
3916        );
3917        advance_all(&mut s, &meta, &HashSet::new());
3918
3919        assert_eq!(s.pending_configs.is_some(), true);
3920        let pending_configs = s.pending_configs.as_ref().unwrap();
3921        assert_eq!(pending_configs.stage, ConfigStage::Both);
3922
3923        // receive large id
3924        channel_id += 1;
3925        index.channel_id = channel_id;
3926        index.term = 2;
3927        entry.channel_id = channel_id;
3928        entry.channel_term = 2;
3929        entry.configs = Some(ChangeConfig {
3930            index_id: next_id,
3931            entry_id: next_id,
3932            term: 2,
3933            stage: ConfigStage::New,
3934            members: HashSet::from_iter(vec![1, 4, 5].into_iter()),
3935        });
3936        replicate_indexes(channel_id, INVALID_ID, INITIAL_TERM, &vec![index], &mut s);
3937        append_specified_entries(&mut s, channel_id, 1, INVALID_ID, INITIAL_TERM, vec![entry]);
3938        advance_all(&mut s, &meta, &HashSet::new());
3939
3940        assert_eq!(s.pending_configs.is_some(), true);
3941        let pending_configs = s.pending_configs.as_ref().unwrap();
3942        assert_eq!(pending_configs.stage, ConfigStage::New);
3943        assert_eq!(pending_configs.configs.contains(&2), false);
3944    }
3945
3946    #[test]
3947    fn recovery_both_stage_config_change() {
3948        let mut log_meta = LogMeta::default();
3949        let mut desc = MemberState {
3950            stage: ConfigStage::Both,
3951            applied: true,
3952        };
3953        log_meta.members.insert(1, desc.clone());
3954        desc.stage = ConfigStage::Old;
3955        log_meta.members.insert(2, desc.clone());
3956        log_meta.members.insert(3, desc.clone());
3957
3958        desc.stage = ConfigStage::New;
3959        desc.applied = false;
3960        log_meta.members.insert(4, desc.clone());
3961        log_meta.members.insert(5, desc.clone());
3962
3963        let local_id = 1;
3964        let mut mem_store = MemStorage::new(local_id);
3965        for (id, _) in &log_meta.members {
3966            mem_store.append_entries(*id, vec![]);
3967        }
3968
3969        let next_id = INVALID_ID + 1u64;
3970        let channel_id = 2;
3971        let entry = Entry {
3972            request_id: CONFIG_CHANGE_ID,
3973            channel_id: channel_id,
3974            channel_term: 1,
3975            entry_id: next_id,
3976            index_id: INVALID_ID,
3977            message: vec![],
3978            context: None,
3979            configs: Some(ChangeConfig {
3980                index_id: next_id,
3981                entry_id: next_id,
3982                term: 1,
3983                stage: ConfigStage::Both,
3984                members: HashSet::from_iter(vec![1, 4, 5].into_iter()),
3985            }),
3986        };
3987        mem_store.append_entries(channel_id, vec![entry]);
3988
3989        let index = LogIndex {
3990            channel_id: channel_id,
3991            entry_id: next_id,
3992            index_id: next_id,
3993            term: 1,
3994            context: None,
3995        };
3996        mem_store.extend_indexes(vec![index]);
3997
3998        let mut s = init_sdcons_with_mem_store(&log_meta, mem_store);
3999        assert_eq!(s.pending_configs.is_some(), true);
4000        let pending_configs = s.pending_configs.as_ref().unwrap();
4001        assert_eq!(pending_configs.stage, ConfigStage::Both);
4002        assert_eq!(pending_configs.old_configs.contains(&2), true);
4003        assert_eq!(pending_configs.old_configs.contains(&3), true);
4004        // FIXME:
4005        // assert_eq!(pending_configs.new_configs.contains(&4), true);
4006        // assert_eq!(pending_configs.new_configs.contains(&5), true);
4007        assert_eq!(pending_configs.new_configs.contains(&1), false);
4008        assert_eq!(pending_configs.old_configs.contains(&1), false);
4009        assert_eq!(pending_configs.configs.contains(&1), true);
4010
4011        s.tick();
4012        s.advance(&log_meta);
4013
4014        let channel_ids = vec![INDEX_CHANNEL_ID, 1, 2, 3, 4, 5];
4015        assert_eq!(s.channel_ids, channel_ids);
4016    }
4017
4018    #[test]
4019    fn recovery_new_stage_config_change() {
4020        let mut log_meta = LogMeta::default();
4021        let mut desc = MemberState {
4022            stage: ConfigStage::Both,
4023            applied: true,
4024        };
4025
4026        // Member state is still in both stage.
4027        log_meta.members.insert(1, desc.clone());
4028        desc.stage = ConfigStage::Old;
4029        log_meta.members.insert(2, desc.clone());
4030        log_meta.members.insert(3, desc.clone());
4031
4032        desc.stage = ConfigStage::New;
4033        desc.applied = false;
4034        log_meta.members.insert(4, desc.clone());
4035        log_meta.members.insert(5, desc.clone());
4036
4037        let local_id = 1;
4038        let mut mem_store = MemStorage::new(local_id);
4039        for (id, _) in &log_meta.members {
4040            mem_store.append_entries(*id, vec![]);
4041        }
4042
4043        let next_id = INVALID_ID + 1u64;
4044        let channel_id = 2;
4045        let entry = Entry {
4046            request_id: CONFIG_CHANGE_ID,
4047            channel_id: channel_id,
4048            channel_term: 1,
4049            entry_id: next_id,
4050            index_id: INVALID_ID,
4051            message: vec![],
4052            context: None,
4053            configs: Some(ChangeConfig {
4054                index_id: next_id,
4055                entry_id: next_id,
4056                term: 1,
4057                stage: ConfigStage::New,
4058                members: HashSet::from_iter(vec![1, 4, 5].into_iter()),
4059            }),
4060        };
4061        mem_store.append_entries(channel_id, vec![entry]);
4062
4063        let index = LogIndex {
4064            channel_id: channel_id,
4065            entry_id: next_id,
4066            index_id: next_id,
4067            term: 1,
4068            context: None,
4069        };
4070        mem_store.extend_indexes(vec![index]);
4071
4072        let mut s = init_sdcons_with_mem_store(&log_meta, mem_store);
4073        assert_eq!(s.pending_configs.is_some(), true);
4074        let pending_configs = s.pending_configs.as_ref().unwrap();
4075        info!("pending configs: {:?}", pending_configs);
4076        assert_eq!(pending_configs.stage, ConfigStage::New);
4077        assert_eq!(pending_configs.old_configs.contains(&2), true);
4078        assert_eq!(pending_configs.old_configs.contains(&3), true);
4079        // FIXME:
4080        // assert_eq!(pending_configs.new_configs.contains(&4), true);
4081        // assert_eq!(pending_configs.new_configs.contains(&5), true);
4082        // assert_eq!(pending_configs.new_configs.contains(&1), false);
4083        assert_eq!(pending_configs.old_configs.contains(&1), false);
4084        assert_eq!(pending_configs.configs.contains(&1), true);
4085
4086        // commit index and config change entry.
4087        declare_with_id(&mut s, 2, 2, 0, 2, 1);
4088        declare_with_id(&mut s, 2, 2, 2, 2, 1);
4089
4090        s.advance(&log_meta);
4091        let channel_ids = vec![INDEX_CHANNEL_ID, 1, 4, 5];
4092        assert_eq!(s.channel_ids, channel_ids);
4093    }
4094
4095    fn is_timeout_now(msg: &Message) -> bool {
4096        println!("msg: {:?}", msg);
4097        if let MsgDetail::TimeoutNow = msg.detail {
4098            true
4099        } else {
4100            false
4101        }
4102    }
4103
4104    #[test]
4105    fn transfer_leadership_directly() {
4106        let meta = init_log_meta_with_members(&vec![1, 2, 3]);
4107        let mut s = init_sdcons(&meta);
4108        let res = s.control(Control::TransferLeader(2));
4109        assert_eq!(res.is_err(), true);
4110
4111        to_leader(&mut s, &meta);
4112        advance_all(&mut s, &meta, &HashSet::new());
4113
4114        let res = s.control(Control::TransferLeader(2));
4115        assert_eq!(res.is_ok(), true);
4116
4117        let res = s.control(Control::TransferLeader(2));
4118        assert_eq!(res.is_ok(), true);
4119
4120        let count = advance_all_and_count_msg(&mut s, &meta, &HashSet::new(), is_timeout_now);
4121        assert_eq!(count > 0, true);
4122    }
4123
4124    #[test]
4125    fn transfer_leadership_after_matched() {
4126        let meta = init_log_meta_with_members(&vec![1, 2, 3]);
4127        let mut s = init_sdcons(&meta);
4128        to_leader(&mut s, &meta);
4129
4130        let requests = vec![1];
4131        submit_tasks(&mut s, &requests);
4132
4133        let res = s.control(Control::TransferLeader(2));
4134        assert_eq!(res.is_ok(), true);
4135
4136        let count = advance_all_and_count_msg(&mut s, &meta, &HashSet::new(), is_timeout_now);
4137        assert_eq!(count, 0);
4138
4139        let last_id = s.channel_next_entry_id(INDEX_CHANNEL_ID) - 1;
4140        assert_eq!(s.remote_matched_id(INDEX_CHANNEL_ID, 2), last_id);
4141
4142        let count = advance_all_and_count_msg(&mut s, &meta, &HashSet::new(), is_timeout_now);
4143        assert_eq!(count > 0, true);
4144    }
4145
4146    #[test]
4147    fn index_leader_read_index() {
4148        let meta = init_log_meta_with_members(&vec![1]);
4149        let mut mem_store = MemStorage::new(1);
4150        mem_store.append_entries(1, vec![]);
4151        mem_store.append_entries(2, vec![]);
4152        mem_store.append_entries(3, vec![]);
4153        mem_store
4154            .extend_indexes(vec![new_index_with_id(1, 1, 1, 2), new_index_with_id(1, 2, 3, 4)]);
4155        let mut s = init_sdcons_with_mem_store(&meta, mem_store);
4156
4157        to_leader(&mut s, &meta);
4158
4159        let r = s.leased_read();
4160        assert_eq!(r.is_ok(), true);
4161        let request_id = r.unwrap();
4162
4163        submit_tasks(&mut s, &vec![4, 5, 6]);
4164
4165        // Index leader handle request immediately, and index should large than the prev
4166        // term last id.
4167        let ready = s.advance(&meta);
4168        apply_entries(&ready, &mut s);
4169        stable_all_entries(&ready, &mut s);
4170        stable_indexes(&ready, &mut s);
4171        assert_eq!(ready.finished_reads.contains_key(&request_id), true);
4172        assert_eq!(*ready.finished_reads.get(&request_id).unwrap(), 2);
4173
4174        s.advance(&meta);
4175
4176        // Index should large than last index id
4177        let request_id = s.leased_read().unwrap();
4178        let ready = s.advance(&meta);
4179        assert_eq!(ready.finished_reads.contains_key(&request_id), true);
4180        info!("print {:?}", ready.finished_reads);
4181        assert_eq!(*ready.finished_reads.get(&request_id).unwrap() >= 6, true);
4182    }
4183
4184    #[test]
4185    fn index_leader_receive_read_index() {
4186        let meta = init_log_meta_with_members(&vec![1]);
4187        let mut mem_store = MemStorage::new(1);
4188        mem_store.append_entries(1, vec![]);
4189        mem_store.append_entries(2, vec![]);
4190        mem_store.append_entries(3, vec![]);
4191        mem_store
4192            .extend_indexes(vec![new_index_with_id(1, 1, 1, 2), new_index_with_id(1, 2, 3, 4)]);
4193        let mut s = init_sdcons_with_mem_store(&meta, mem_store);
4194
4195        to_leader(&mut s, &meta);
4196
4197        let sender = 2;
4198        let request_id = 123;
4199        let mut msg = s.build_msg_header(INDEX_CHANNEL_ID);
4200        msg.from = sender;
4201        msg.to = 1;
4202        msg.detail = MsgDetail::Read(ReadMsg { request_id });
4203        s.step(msg);
4204
4205        submit_tasks(&mut s, &vec![4, 5, 6]);
4206
4207        // Index leader handle request immediately, and index should large than the prev
4208        // term last id.
4209        let ready = s.advance(&meta);
4210        apply_entries(&ready, &mut s);
4211        stable_all_entries(&ready, &mut s);
4212        stable_indexes(&ready, &mut s);
4213        assert_eq!(ready.msgs.len(), 1);
4214        info!("print {:?}", ready.msgs);
4215        let msgs = ready.msgs.get(&INDEX_CHANNEL_ID).unwrap();
4216        assert_eq!(msgs.len(), 1);
4217        if let MsgDetail::ReadReply(reply) = &msgs[0].detail {
4218            assert_eq!(reply.request_id, request_id);
4219            assert_eq!(reply.recommend_id, 2);
4220        }
4221
4222        s.advance(&meta);
4223
4224        // Index should large than last index id
4225        let request_id = 123123;
4226        let mut msg = s.build_msg_header(INDEX_CHANNEL_ID);
4227        msg.from = sender;
4228        msg.to = 1;
4229        msg.detail = MsgDetail::Read(ReadMsg { request_id });
4230        s.step(msg);
4231
4232        let ready = s.advance(&meta);
4233        assert_eq!(ready.msgs.len(), 1);
4234        let msgs = ready.msgs.get(&INDEX_CHANNEL_ID).unwrap();
4235        assert_eq!(msgs.len(), 1);
4236        if let MsgDetail::ReadReply(reply) = &msgs[0].detail {
4237            assert_eq!(reply.request_id, request_id);
4238            assert_eq!(reply.recommend_id >= 6, true);
4239        }
4240    }
4241
4242    #[test]
4243    fn follower_retry_read_index_when_heartbeat() {
4244        let mut log_meta = init_log_meta_with_default_members();
4245        let mut s = init_sdcons(&log_meta);
4246
4247        to_leader(&mut s, &log_meta);
4248        let next_channel_term = s.get_index_term() + 1;
4249        declare_leader(&mut s, next_channel_term, INDEX_CHANNEL_ID, next_channel_term);
4250
4251        assert_eq!(s.is_index_leader(), false);
4252        assert_eq!(s.is_local_channel_leader(), true);
4253        s.advance(&log_meta);
4254
4255        // invoke read_index(), follower will forwards this request to index leader.
4256        let request_id = s.leased_read().unwrap();
4257        let ready = s.advance(&log_meta);
4258        assert_eq!(ready.msgs.len(), 1);
4259        let msgs = ready.msgs.get(&INDEX_CHANNEL_ID).unwrap();
4260        info!("{:?}", msgs);
4261        assert_eq!(msgs.len(), 1);
4262        if let MsgDetail::Read(req) = &msgs[0].detail {
4263            assert_eq!(req.request_id, request_id);
4264        }
4265
4266        // request timeouted, retry.
4267        s.tick();
4268        let ready = s.advance(&log_meta);
4269        let msgs = ready.msgs.get(&INDEX_CHANNEL_ID).unwrap();
4270        info!("{:?}", msgs);
4271        assert_eq!(msgs.len(), 1);
4272        if let MsgDetail::Read(req) = &msgs[0].detail {
4273            assert_eq!(req.request_id, request_id);
4274        }
4275
4276        // receive leader response
4277        let sender = 2;
4278        let mut msg = s.build_msg_header(INDEX_CHANNEL_ID);
4279        msg.from = sender;
4280        msg.to = 1;
4281        msg.detail = MsgDetail::ReadReply(ReadReplyMsg {
4282            request_id,
4283            recommend_id: 6,
4284        });
4285        s.step(msg);
4286        let ready = s.advance(&log_meta);
4287        assert_eq!(ready.finished_reads.contains_key(&request_id), true);
4288        info!("print {:?}", ready.finished_reads);
4289        assert_eq!(*ready.finished_reads.get(&request_id).unwrap() >= 6, true);
4290    }
4291}