mephisto_raft/
raw_node.rs

1// Copyright 2023 CratesLand Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The raw node of the raft module.
16//!
17//! This module contains the value types for the node and it's connection to other
18//! nodes but not the raft consensus itself. Generally, you'll interact with the
19//! RawNode first and use it to access the inner workings of the consensus protocol.
20
21use std::{
22    collections::VecDeque,
23    fmt::{Display, Formatter},
24    mem,
25};
26
27use prost::Message as PbMessage;
28use tracing::info;
29
30use crate::{
31    config::Config,
32    eraftpb::{ConfChange, ConfState, Entry, EntryType, HardState, Message, MessageType, Snapshot},
33    errors::{Error, Result},
34    read_only::ReadState,
35    storage::GetEntriesFor,
36    GetEntriesContext, Raft, SoftState, StateRole, Status, Storage,
37};
38
39/// Represents a Peer node in the cluster.
40#[derive(Debug, Default, Clone)]
41pub struct Peer {
42    /// The ID of the peer.
43    pub id: u64,
44    /// The IP address of the peer.
45    pub address: String,
46}
47
48impl Display for Peer {
49    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
50        write!(f, "n{}({})", self.id, self.address)
51    }
52}
53
54/// The status of the snapshot.
55#[derive(Debug, PartialEq, Eq, Copy, Clone)]
56pub enum SnapshotStatus {
57    /// Represents that the snapshot is finished being created.
58    Finish,
59    /// Indicates that the snapshot failed to build or is not ready.
60    Failure,
61}
62
63/// Checks if certain message type should be used internally.
64pub fn is_local_msg(t: MessageType) -> bool {
65    matches!(
66        t,
67        MessageType::MsgHup
68            | MessageType::MsgBeat
69            | MessageType::MsgUnreachable
70            | MessageType::MsgSnapStatus
71            | MessageType::MsgCheckQuorum
72    )
73}
74
75fn is_response_msg(t: MessageType) -> bool {
76    matches!(
77        t,
78        MessageType::MsgAppendResponse
79            | MessageType::MsgRequestVoteResponse
80            | MessageType::MsgHeartbeatResponse
81            | MessageType::MsgUnreachable
82            | MessageType::MsgRequestPreVoteResponse
83    )
84}
85
86/// Ready encapsulates the entries and messages that are ready to read,
87/// be saved to stable storage, committed or sent to other peers.
88#[derive(Default, Debug, PartialEq)]
89pub struct Ready {
90    number: u64,
91
92    ss: Option<SoftState>,
93
94    hs: Option<HardState>,
95
96    read_states: Vec<ReadState>,
97
98    entries: Vec<Entry>,
99
100    snapshot: Snapshot,
101
102    is_persisted_msg: bool,
103
104    light: LightReady,
105
106    must_sync: bool,
107}
108
109impl Ready {
110    /// The number of current Ready.
111    /// It is used for identifying the different Ready and ReadyRecord.
112    #[inline]
113    pub fn number(&self) -> u64 {
114        self.number
115    }
116
117    /// The current volatile state of a Node.
118    /// SoftState will be None if there is no update.
119    /// It is not required to consume or store SoftState.
120    #[inline]
121    pub fn ss(&self) -> Option<&SoftState> {
122        self.ss.as_ref()
123    }
124
125    /// The current state of a Node to be saved to stable storage.
126    /// HardState will be None state if there is no update.
127    #[inline]
128    pub fn hs(&self) -> Option<&HardState> {
129        self.hs.as_ref()
130    }
131
132    /// ReadStates specifies the state for read only query.
133    #[inline]
134    pub fn read_states(&self) -> &Vec<ReadState> {
135        &self.read_states
136    }
137
138    /// Take the ReadStates.
139    #[inline]
140    pub fn take_read_states(&mut self) -> Vec<ReadState> {
141        mem::take(&mut self.read_states)
142    }
143
144    /// Entries specifies entries to be saved to stable storage.
145    #[inline]
146    pub fn entries(&self) -> &Vec<Entry> {
147        &self.entries
148    }
149
150    /// Take the Entries.
151    #[inline]
152    pub fn take_entries(&mut self) -> Vec<Entry> {
153        mem::take(&mut self.entries)
154    }
155
156    /// Snapshot specifies the snapshot to be saved to stable storage.
157    #[inline]
158    pub fn snapshot(&self) -> &Snapshot {
159        &self.snapshot
160    }
161
162    /// CommittedEntries specifies entries to be committed to a
163    /// store/state-machine. These have previously been committed to stable
164    /// store.
165    #[inline]
166    pub fn committed_entries(&self) -> &Vec<Entry> {
167        self.light.committed_entries()
168    }
169
170    /// Take the CommitEntries.
171    #[inline]
172    pub fn take_committed_entries(&mut self) -> Vec<Entry> {
173        self.light.take_committed_entries()
174    }
175
176    /// Messages specifies outbound messages to be sent.
177    /// If it contains a MsgSnap message, the application MUST report back to raft
178    /// when the snapshot has been received or has failed by calling ReportSnapshot.
179    #[inline]
180    pub fn messages(&self) -> &[Message] {
181        if !self.is_persisted_msg {
182            self.light.messages()
183        } else {
184            &[]
185        }
186    }
187
188    /// Take the Messages.
189    #[inline]
190    pub fn take_messages(&mut self) -> Vec<Message> {
191        if !self.is_persisted_msg {
192            self.light.take_messages()
193        } else {
194            Vec::new()
195        }
196    }
197
198    /// Persisted Messages specifies outbound messages to be sent AFTER the HardState,
199    /// Entries and Snapshot are persisted to stable storage.
200    #[inline]
201    pub fn persisted_messages(&self) -> &[Message] {
202        if self.is_persisted_msg {
203            self.light.messages()
204        } else {
205            &[]
206        }
207    }
208
209    /// Take the Persisted Messages.
210    #[inline]
211    pub fn take_persisted_messages(&mut self) -> Vec<Message> {
212        if self.is_persisted_msg {
213            self.light.take_messages()
214        } else {
215            Vec::new()
216        }
217    }
218
219    /// MustSync is false if and only if
220    /// 1. no HardState or only its commit is different from before
221    /// 2. no Entries and Snapshot
222    /// If it's false, an asynchronous write of HardState is permissible before calling
223    /// [`RawNode::on_persist_ready`] or [`RawNode::advance`] or its families.
224    #[inline]
225    pub fn must_sync(&self) -> bool {
226        self.must_sync
227    }
228}
229
230/// ReadyRecord encapsulates some needed data from the corresponding Ready.
231#[derive(Default, Debug, PartialEq)]
232struct ReadyRecord {
233    number: u64,
234    // (index, term) of the last entry from the entries in Ready
235    last_entry: Option<(u64, u64)>,
236    // (index, term) of the snapshot in Ready
237    snapshot: Option<(u64, u64)>,
238}
239
240/// LightReady encapsulates the commit index, committed entries and
241/// messages that are ready to be applied or be sent to other peers.
242#[derive(Default, Debug, PartialEq)]
243pub struct LightReady {
244    commit_index: Option<u64>,
245    committed_entries: Vec<Entry>,
246    messages: Vec<Message>,
247}
248
249impl LightReady {
250    /// The current commit index.
251    /// It will be None state if there is no update.
252    /// It is not required to save it to stable storage.
253    #[inline]
254    pub fn commit_index(&self) -> Option<u64> {
255        self.commit_index
256    }
257
258    /// CommittedEntries specifies entries to be committed to a
259    /// store/state-machine. These have previously been committed to stable
260    /// store.
261    #[inline]
262    pub fn committed_entries(&self) -> &Vec<Entry> {
263        &self.committed_entries
264    }
265
266    /// Take the CommittedEntries.
267    #[inline]
268    pub fn take_committed_entries(&mut self) -> Vec<Entry> {
269        mem::take(&mut self.committed_entries)
270    }
271
272    /// Messages specifies outbound messages to be sent.
273    #[inline]
274    pub fn messages(&self) -> &[Message] {
275        &self.messages
276    }
277
278    /// Take the Messages.
279    #[inline]
280    pub fn take_messages(&mut self) -> Vec<Message> {
281        mem::take(&mut self.messages)
282    }
283}
284
285/// RawNode is a thread-unsafe Node.
286/// The methods of this struct correspond to the methods of Node and are described
287/// more fully there.
288pub struct RawNode<T: Storage> {
289    /// The internal raft state.
290    pub raft: Raft<T>,
291    prev_ss: SoftState,
292    prev_hs: HardState,
293    // Current max number of Record and ReadyRecord.
294    max_number: u64,
295    records: VecDeque<ReadyRecord>,
296    // Index which the given committed entries should start from.
297    commit_since_index: u64,
298}
299
300impl<T: Storage> RawNode<T> {
301    /// Create a new RawNode given some [`Config`].
302    pub fn new(config: &Config, store: T) -> Result<Self> {
303        assert_ne!(config.id, 0, "config.id must not be zero");
304        let r = Raft::new(config, store)?;
305        let mut rn = RawNode {
306            raft: r,
307            prev_hs: Default::default(),
308            prev_ss: Default::default(),
309            max_number: 0,
310            records: VecDeque::new(),
311            commit_since_index: config.applied,
312        };
313        rn.prev_hs = rn.raft.hard_state();
314        rn.prev_ss = rn.raft.soft_state();
315        info!("RawNode created with id {id}.", id = rn.raft.id);
316        Ok(rn)
317    }
318
319    /// Sets priority of node.
320    #[inline]
321    pub fn set_priority(&mut self, priority: i64) {
322        self.raft.set_priority(priority);
323    }
324
325    /// Tick advances the internal logical clock by a single tick.
326    ///
327    /// Returns true to indicate that there will probably be some readiness which
328    /// needs to be handled.
329    pub fn tick(&mut self) -> bool {
330        self.raft.tick()
331    }
332
333    /// Campaign causes this RawNode to transition to candidate state.
334    pub fn campaign(&mut self) -> Result<()> {
335        let mut m = Message::default();
336        m.set_msg_type(MessageType::MsgHup);
337        self.raft.step(m)
338    }
339
340    /// Propose proposes data be appended to the raft log.
341    pub fn propose(&mut self, context: Vec<u8>, data: Vec<u8>) -> Result<()> {
342        let m = Message {
343            msg_type: MessageType::MsgPropose as i32,
344            from: self.raft.id,
345            entries: vec![Entry {
346                data,
347                context,
348                ..Default::default()
349            }],
350            ..Default::default()
351        };
352        self.raft.step(m)
353    }
354
355    /// Broadcast heartbeats to all the followers.
356    ///
357    /// If it's not leader, nothing will happen.
358    pub fn ping(&mut self) {
359        self.raft.ping()
360    }
361
362    /// ProposeConfChange proposes a config change.
363    ///
364    /// If the node enters joint state with `auto_leave` set to true, it's
365    /// caller's responsibility to propose an empty conf change again to force
366    /// leaving joint state.
367    #[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_pass_by_value))]
368    pub fn propose_conf_change(&mut self, context: Vec<u8>, cc: ConfChange) -> Result<()> {
369        let (data, ty) = (cc.encode_to_vec(), EntryType::EntryConfChange);
370        let m = Message {
371            msg_type: MessageType::MsgPropose as i32,
372            entries: vec![Entry {
373                entry_type: ty as i32,
374                data,
375                context,
376                ..Default::default()
377            }],
378            ..Default::default()
379        };
380        self.raft.step(m)
381    }
382
383    /// Applies a config change to the local node. The app must call this when it
384    /// applies a configuration change, except when it decides to reject the
385    /// configuration change, in which case no call must take place.
386    pub fn apply_conf_change(&mut self, cc: &ConfChange) -> Result<ConfState> {
387        self.raft.apply_conf_change(cc)
388    }
389
390    /// Step advances the state machine using the given message.
391    pub fn step(&mut self, m: Message) -> Result<()> {
392        // Ignore unexpected local messages receiving over network
393        if is_local_msg(m.msg_type()) {
394            return Err(Error::StepLocalMsg);
395        }
396        if self.raft.prs().get(m.from).is_some() || !is_response_msg(m.msg_type()) {
397            return self.raft.step(m);
398        }
399        Err(Error::StepPeerNotFound)
400    }
401
402    /// A callback when entries are fetched asynchronously.
403    /// The context should provide the context passed from Storage.entries().
404    /// See more in the comment of Storage.entries().
405    ///
406    /// # Panics
407    ///
408    /// Panics if passed with the context of context.can_async() == false
409    pub fn on_entries_fetched(&mut self, context: GetEntriesContext) {
410        match context.0 {
411            GetEntriesFor::SendAppend {
412                to,
413                term,
414                aggressively,
415            } => {
416                if self.raft.term != term || self.raft.state != StateRole::Leader {
417                    // term or leadership has changed
418                    return;
419                }
420                if self.raft.prs().get(to).is_none() {
421                    // the peer has been removed, do nothing
422                    return;
423                }
424
425                if aggressively {
426                    self.raft.send_append_aggressively(to)
427                } else {
428                    self.raft.send_append(to)
429                }
430            }
431            GetEntriesFor::Empty(can_async) if can_async => {}
432            _ => panic!("shouldn't call callback on non-async context"),
433        }
434    }
435
436    /// Generates a LightReady that has the committed entries and messages but no commit index.
437    fn gen_light_ready(&mut self) -> LightReady {
438        let mut rd = LightReady::default();
439        let max_size = Some(self.raft.max_committed_size_per_ready);
440        let raft = &mut self.raft;
441        rd.committed_entries = raft
442            .raft_log
443            .next_entries_since(self.commit_since_index, max_size)
444            .unwrap_or_default();
445        // Update raft uncommitted entries size
446        raft.reduce_uncommitted_size(&rd.committed_entries);
447        if let Some(e) = rd.committed_entries.last() {
448            assert!(self.commit_since_index < e.index);
449            self.commit_since_index = e.index;
450        }
451
452        if !raft.msgs.is_empty() {
453            rd.messages = mem::take(&mut raft.msgs);
454        }
455
456        rd
457    }
458
459    /// Returns the outstanding work that the application needs to handle.
460    ///
461    /// This includes appending and applying entries or a snapshot, updating the HardState,
462    /// and sending messages. The returned `Ready` *MUST* be handled and subsequently
463    /// passed back via `advance` or its families. Before that, *DO NOT* call any function like
464    /// `step`, `propose`, `campaign` to change internal state.
465    ///
466    /// [`Self::has_ready`] should be called first to check if it's necessary to handle the ready.
467    pub fn ready(&mut self) -> Ready {
468        let raft = &mut self.raft;
469
470        self.max_number += 1;
471        let mut rd = Ready {
472            number: self.max_number,
473            ..Default::default()
474        };
475        let mut rd_record = ReadyRecord {
476            number: self.max_number,
477            ..Default::default()
478        };
479
480        if self.prev_ss.raft_state != StateRole::Leader && raft.state == StateRole::Leader {
481            // The vote msg which makes this peer become leader has been sent after persisting.
482            // So the remaining records must be generated during being candidate which can not
483            // have last_entry and snapshot(if so, it should become follower).
484            for record in self.records.drain(..) {
485                assert_eq!(record.last_entry, None);
486                assert_eq!(record.snapshot, None);
487            }
488        }
489
490        let ss = raft.soft_state();
491        if ss != self.prev_ss {
492            rd.ss = Some(ss);
493        }
494        let hs = raft.hard_state();
495        if hs != self.prev_hs {
496            if hs.vote != self.prev_hs.vote || hs.term != self.prev_hs.term {
497                rd.must_sync = true;
498            }
499            rd.hs = Some(hs);
500        }
501
502        if !raft.read_states.is_empty() {
503            rd.read_states = mem::take(&mut raft.read_states);
504        }
505
506        if let Some(snapshot) = &raft.raft_log.unstable_snapshot() {
507            rd.snapshot = snapshot.clone();
508            assert!(self.commit_since_index <= rd.snapshot.get_metadata().index);
509            self.commit_since_index = rd.snapshot.get_metadata().index;
510            // If there is a snapshot, the latter entries can not be persisted
511            // so there is no committed entries.
512            assert!(
513                !raft
514                    .raft_log
515                    .has_next_entries_since(self.commit_since_index),
516                "has snapshot but also has committed entries since {}",
517                self.commit_since_index
518            );
519            rd_record.snapshot = Some((
520                rd.snapshot.get_metadata().index,
521                rd.snapshot.get_metadata().term,
522            ));
523            rd.must_sync = true;
524        }
525
526        rd.entries = raft.raft_log.unstable_entries().to_vec();
527        if let Some(e) = rd.entries.last() {
528            // If the last entry exists, the entries must not empty, vice versa.
529            rd.must_sync = true;
530            rd_record.last_entry = Some((e.index, e.term));
531        }
532
533        // Leader can send messages immediately to make replication concurrently.
534        // For more details, check raft thesis 10.2.1.
535        rd.is_persisted_msg = raft.state != StateRole::Leader;
536        rd.light = self.gen_light_ready();
537        self.records.push_back(rd_record);
538        rd
539    }
540
541    /// HasReady called when RawNode user need to check if any Ready pending.
542    pub fn has_ready(&self) -> bool {
543        let raft = &self.raft;
544        if !raft.msgs.is_empty() {
545            return true;
546        }
547
548        if raft.soft_state() != self.prev_ss {
549            return true;
550        }
551        if raft.hard_state() != self.prev_hs {
552            return true;
553        }
554
555        if !raft.read_states.is_empty() {
556            return true;
557        }
558
559        if !raft.raft_log.unstable_entries().is_empty() {
560            return true;
561        }
562
563        if self.snap().map_or(false, |s| !s.is_empty()) {
564            return true;
565        }
566
567        if raft
568            .raft_log
569            .has_next_entries_since(self.commit_since_index)
570        {
571            return true;
572        }
573
574        false
575    }
576
577    fn commit_ready(&mut self, rd: Ready) {
578        if let Some(ss) = rd.ss {
579            self.prev_ss = ss;
580        }
581        if let Some(hs) = rd.hs {
582            self.prev_hs = hs;
583        }
584        let rd_record = self.records.back().unwrap();
585        assert_eq!(rd_record.number, rd.number);
586        let raft = &mut self.raft;
587        if let Some((index, _)) = rd_record.snapshot {
588            raft.raft_log.stable_snap(index);
589        }
590        if let Some((index, term)) = rd_record.last_entry {
591            raft.raft_log.stable_entries(index, term);
592        }
593    }
594
595    fn commit_apply(&mut self, applied: u64) {
596        self.raft.commit_apply(applied);
597    }
598
599    /// Notifies that the ready of this number has been persisted.
600    ///
601    /// Since Ready must be persisted in order, calling this function implicitly means
602    /// all readies with numbers smaller than this one have been persisted.
603    ///
604    /// [`Self::has_ready`] and [`Self::ready`] should be called later to handle further
605    /// updates that become valid after ready being persisted.
606    pub fn on_persist_ready(&mut self, number: u64) {
607        let (mut index, mut term) = (0, 0);
608        let mut snap_index = 0;
609        while let Some(record) = self.records.front() {
610            if record.number > number {
611                break;
612            }
613            let record = self.records.pop_front().unwrap();
614
615            if let Some((i, _)) = record.snapshot {
616                snap_index = i;
617                index = 0;
618                term = 0;
619            }
620
621            if let Some((i, t)) = record.last_entry {
622                index = i;
623                term = t;
624            }
625        }
626        if snap_index != 0 {
627            self.raft.on_persist_snap(snap_index);
628        }
629        if index != 0 {
630            self.raft.on_persist_entries(index, term);
631        }
632    }
633
634    /// Advances the ready after fully processing it.
635    ///
636    /// Fully processing a ready requires to persist snapshot, entries and hard states, apply all
637    /// committed entries, send all messages.
638    ///
639    /// Returns the LightReady that contains commit index, committed entries and messages.
640    /// [`LightReady`] contains updates that only valid after persisting last ready. It should
641    /// also be fully processed. Then [`Self::advance_apply`] or [`Self::advance_apply_to`]
642    /// should be used later to update applying progress.
643    pub fn advance(&mut self, rd: Ready) -> LightReady {
644        let applied = self.commit_since_index;
645        let light_rd = self.advance_append(rd);
646        self.advance_apply_to(applied);
647        light_rd
648    }
649
650    /// Advances the ready without applying committed entries. [`Self::advance_apply`] or
651    /// [`Self::advance_apply_to`] should be used later to update applying progress.
652    ///
653    /// Returns the LightReady that contains commit index, committed entries and messages.
654    ///
655    /// Since Ready must be persisted in order, calling this function implicitly means
656    /// all ready collected before have been persisted.
657    #[inline]
658    pub fn advance_append(&mut self, rd: Ready) -> LightReady {
659        self.commit_ready(rd);
660        self.on_persist_ready(self.max_number);
661        let mut light_rd = self.gen_light_ready();
662        if self.raft.state != StateRole::Leader && !light_rd.messages().is_empty() {
663            panic!("not leader but has new msg after advance");
664        }
665        // Set commit index if it's updated
666        let hard_state = self.raft.hard_state();
667        if hard_state.commit > self.prev_hs.commit {
668            light_rd.commit_index = Some(hard_state.commit);
669            self.prev_hs.commit = hard_state.commit;
670        } else {
671            assert_eq!(hard_state.commit, self.prev_hs.commit);
672            light_rd.commit_index = None;
673        }
674        assert_eq!(hard_state, self.prev_hs, "hard state != prev_hs");
675        light_rd
676    }
677
678    /// Same as [`Self::advance_append`] except that it allows to only store the updates in cache.
679    /// [`Self::on_persist_ready`] should be used later to update the persisting progress.
680    ///
681    /// Raft works on an assumption persisted updates should not be lost, which usually requires
682    /// expensive operations like `fsync`. `advance_append_async` allows you to control the rate
683    /// of such operations and get a reasonable batch size. However, it's still required that
684    /// the updates can be read by raft from the `Storage` trait before calling
685    /// `advance_append_async`.
686    #[inline]
687    pub fn advance_append_async(&mut self, rd: Ready) {
688        self.commit_ready(rd);
689    }
690
691    /// Advance apply to the index of the last committed entries given before.
692    #[inline]
693    pub fn advance_apply(&mut self) {
694        self.commit_apply(self.commit_since_index);
695    }
696
697    /// Advance apply to the passed index.
698    #[inline]
699    pub fn advance_apply_to(&mut self, applied: u64) {
700        self.commit_apply(applied);
701    }
702
703    /// Grabs the snapshot from the raft if available.
704    #[inline]
705    pub fn snap(&self) -> Option<&Snapshot> {
706        self.raft.snap()
707    }
708
709    /// Status returns the current status of the given group.
710    #[inline]
711    pub fn status(&self) -> Status {
712        Status::new(&self.raft)
713    }
714
715    /// ReportUnreachable reports the given node is not reachable for the last send.
716    pub fn report_unreachable(&mut self, id: u64) {
717        let mut m = Message::default();
718        m.set_msg_type(MessageType::MsgUnreachable);
719        m.from = id;
720        // we don't care if it is ok actually
721        let _ = self.raft.step(m);
722    }
723
724    /// ReportSnapshot reports the status of the sent snapshot.
725    pub fn report_snapshot(&mut self, id: u64, status: SnapshotStatus) {
726        let rej = status == SnapshotStatus::Failure;
727        let mut m = Message::default();
728        m.set_msg_type(MessageType::MsgSnapStatus);
729        m.from = id;
730        m.reject = rej;
731        // we don't care if it is ok actually
732        let _ = self.raft.step(m);
733    }
734
735    /// Request a snapshot from a leader.
736    /// The snapshot's index must be greater or equal to the request_index (last_index) or
737    /// the leader's term must be greater than the request term (last_index's term).
738    pub fn request_snapshot(&mut self) -> Result<()> {
739        self.raft.request_snapshot()
740    }
741
742    /// TransferLeader tries to transfer leadership to the given transferee.
743    pub fn transfer_leader(&mut self, transferee: u64) {
744        let mut m = Message::default();
745        m.set_msg_type(MessageType::MsgTransferLeader);
746        m.from = transferee;
747        let _ = self.raft.step(m);
748    }
749
750    /// ReadIndex requests a read state. The read state will be set in ready.
751    /// Read State has a read index. Once the application advances further than the read
752    /// index, any linearizable read requests issued before the read request can be
753    /// processed safely. The read state will have the same rctx attached.
754    pub fn read_index(&mut self, rctx: Vec<u8>) {
755        let m = Message {
756            msg_type: MessageType::MsgReadIndex as i32,
757            entries: vec![Entry {
758                data: rctx,
759                ..Default::default()
760            }],
761            ..Default::default()
762        };
763        let _ = self.raft.step(m);
764    }
765
766    /// Returns the store as an immutable reference.
767    #[inline]
768    pub fn store(&self) -> &T {
769        self.raft.store()
770    }
771
772    /// Returns the store as a mutable reference.
773    #[inline]
774    pub fn mut_store(&mut self) -> &mut T {
775        self.raft.mut_store()
776    }
777
778    /// Set whether skip broadcast empty commit messages at runtime.
779    #[inline]
780    pub fn skip_bcast_commit(&mut self, skip: bool) {
781        self.raft.skip_bcast_commit(skip)
782    }
783
784    /// Set whether to batch append msg at runtime.
785    #[inline]
786    pub fn set_batch_append(&mut self, batch_append: bool) {
787        self.raft.set_batch_append(batch_append)
788    }
789}
790
791#[cfg(test)]
792mod test {
793    use super::is_local_msg;
794    use crate::eraftpb::MessageType;
795
796    #[test]
797    fn test_is_local_msg() {
798        let tests = vec![
799            (MessageType::MsgHup, true),
800            (MessageType::MsgBeat, true),
801            (MessageType::MsgUnreachable, true),
802            (MessageType::MsgSnapStatus, true),
803            (MessageType::MsgCheckQuorum, true),
804            (MessageType::MsgPropose, false),
805            (MessageType::MsgAppend, false),
806            (MessageType::MsgAppendResponse, false),
807            (MessageType::MsgRequestVote, false),
808            (MessageType::MsgRequestVoteResponse, false),
809            (MessageType::MsgSnapshot, false),
810            (MessageType::MsgHeartbeat, false),
811            (MessageType::MsgHeartbeatResponse, false),
812            (MessageType::MsgTransferLeader, false),
813            (MessageType::MsgTimeoutNow, false),
814            (MessageType::MsgReadIndex, false),
815            (MessageType::MsgReadIndexResp, false),
816            (MessageType::MsgRequestPreVote, false),
817            (MessageType::MsgRequestPreVoteResponse, false),
818        ];
819        for (msg_type, result) in tests {
820            assert_eq!(is_local_msg(msg_type), result);
821        }
822    }
823}