jopemachine_raft/
raw_node.rs

1// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3// Copyright 2015 The etcd Authors
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17//! The raw node of the raft module.
18//!
19//! This module contains the value types for the node and it's connection to other
20//! nodes but not the raft consensus itself. Generally, you'll interact with the
21//! RawNode first and use it to access the inner workings of the consensus protocol.
22
23use std::sync::Arc;
24use std::{collections::VecDeque, mem};
25
26use crate::logger::Logger;
27use protobuf::Message as PbMessage;
28use raft_proto::ConfChangeI;
29
30use crate::eraftpb::{ConfState, Entry, EntryType, HardState, Message, MessageType, Snapshot};
31use crate::errors::{Error, Result};
32use crate::read_only::ReadState;
33use crate::{config::Config, StateRole};
34use crate::{storage::GetEntriesFor, GetEntriesContext, Raft, SoftState, Status, Storage};
35
36/// Represents a Peer node in the cluster.
37#[derive(Debug, Default)]
38pub struct Peer {
39    /// The ID of the peer.
40    pub id: u64,
41    /// If there is context associated with the peer (like connection information), it can be
42    /// serialized and stored here.
43    pub context: Option<Vec<u8>>,
44}
45
46/// The status of the snapshot.
47#[derive(Debug, PartialEq, Eq, Copy, Clone)]
48pub enum SnapshotStatus {
49    /// Represents that the snapshot is finished being created.
50    Finish,
51    /// Indicates that the snapshot failed to build or is not ready.
52    Failure,
53}
54
55/// Checks if certain message type should be used internally.
56pub fn is_local_msg(t: MessageType) -> bool {
57    matches!(
58        t,
59        MessageType::MsgHup
60            | MessageType::MsgBeat
61            | MessageType::MsgUnreachable
62            | MessageType::MsgSnapStatus
63            | MessageType::MsgCheckQuorum
64    )
65}
66
67fn is_response_msg(t: MessageType) -> bool {
68    matches!(
69        t,
70        MessageType::MsgAppendResponse
71            | MessageType::MsgRequestVoteResponse
72            | MessageType::MsgHeartbeatResponse
73            | MessageType::MsgUnreachable
74            | MessageType::MsgRequestPreVoteResponse
75    )
76}
77
78/// For a given snapshot, determine if it's empty or not.
79#[deprecated(since = "0.6.0", note = "Please use `Snapshot::is_empty` instead")]
80pub fn is_empty_snap(s: &Snapshot) -> bool {
81    s.is_empty()
82}
83
84/// Ready encapsulates the entries and messages that are ready to read,
85/// be saved to stable storage, committed or sent to other peers.
86#[derive(Default, Debug, PartialEq)]
87pub struct Ready {
88    number: u64,
89
90    ss: Option<SoftState>,
91
92    hs: Option<HardState>,
93
94    read_states: Vec<ReadState>,
95
96    entries: Vec<Entry>,
97
98    snapshot: Snapshot,
99
100    is_persisted_msg: bool,
101
102    light: LightReady,
103
104    must_sync: bool,
105}
106
107impl Ready {
108    /// The number of current Ready.
109    /// It is used for identifying the different Ready and ReadyRecord.
110    #[inline]
111    pub fn number(&self) -> u64 {
112        self.number
113    }
114
115    /// The current volatile state of a Node.
116    /// SoftState will be None if there is no update.
117    /// It is not required to consume or store SoftState.
118    #[inline]
119    pub fn ss(&self) -> Option<&SoftState> {
120        self.ss.as_ref()
121    }
122
123    /// The current state of a Node to be saved to stable storage.
124    /// HardState will be None state if there is no update.
125    #[inline]
126    pub fn hs(&self) -> Option<&HardState> {
127        self.hs.as_ref()
128    }
129
130    /// ReadStates specifies the state for read only query.
131    #[inline]
132    pub fn read_states(&self) -> &Vec<ReadState> {
133        &self.read_states
134    }
135
136    /// Take the ReadStates.
137    #[inline]
138    pub fn take_read_states(&mut self) -> Vec<ReadState> {
139        mem::take(&mut self.read_states)
140    }
141
142    /// Entries specifies entries to be saved to stable storage.
143    #[inline]
144    pub fn entries(&self) -> &Vec<Entry> {
145        &self.entries
146    }
147
148    /// Take the Entries.
149    #[inline]
150    pub fn take_entries(&mut self) -> Vec<Entry> {
151        mem::take(&mut self.entries)
152    }
153
154    /// Snapshot specifies the snapshot to be saved to stable storage.
155    #[inline]
156    pub fn snapshot(&self) -> &Snapshot {
157        &self.snapshot
158    }
159
160    /// CommittedEntries specifies entries to be committed to a
161    /// store/state-machine. These have previously been committed to stable
162    /// store.
163    #[inline]
164    pub fn committed_entries(&self) -> &Vec<Entry> {
165        self.light.committed_entries()
166    }
167
168    /// Take the CommitEntries.
169    #[inline]
170    pub fn take_committed_entries(&mut self) -> Vec<Entry> {
171        self.light.take_committed_entries()
172    }
173
174    /// Messages specifies outbound messages to be sent.
175    /// If it contains a MsgSnap message, the application MUST report back to raft
176    /// when the snapshot has been received or has failed by calling ReportSnapshot.
177    #[inline]
178    pub fn messages(&self) -> &[Message] {
179        if !self.is_persisted_msg {
180            self.light.messages()
181        } else {
182            &[]
183        }
184    }
185
186    /// Take the Messages.
187    #[inline]
188    pub fn take_messages(&mut self) -> Vec<Message> {
189        if !self.is_persisted_msg {
190            self.light.take_messages()
191        } else {
192            Vec::new()
193        }
194    }
195
196    /// Persisted Messages specifies outbound messages to be sent AFTER the HardState,
197    /// Entries and Snapshot are persisted to stable storage.
198    #[inline]
199    pub fn persisted_messages(&self) -> &[Message] {
200        if self.is_persisted_msg {
201            self.light.messages()
202        } else {
203            &[]
204        }
205    }
206
207    /// Take the Persisted Messages.
208    #[inline]
209    pub fn take_persisted_messages(&mut self) -> Vec<Message> {
210        if self.is_persisted_msg {
211            self.light.take_messages()
212        } else {
213            Vec::new()
214        }
215    }
216
217    /// MustSync is false if and only if
218    /// 1. no HardState or only its commit is different from before
219    /// 2. no Entries and Snapshot
220    /// If it's false, an asynchronous write of HardState is permissible before calling
221    /// [`RawNode::on_persist_ready`] or [`RawNode::advance`] or its families.
222    #[inline]
223    pub fn must_sync(&self) -> bool {
224        self.must_sync
225    }
226}
227
228/// ReadyRecord encapsulates some needed data from the corresponding Ready.
229#[derive(Default, Debug, PartialEq)]
230struct ReadyRecord {
231    number: u64,
232    // (index, term) of the last entry from the entries in Ready
233    last_entry: Option<(u64, u64)>,
234    // (index, term) of the snapshot in Ready
235    snapshot: Option<(u64, u64)>,
236}
237
238/// LightReady encapsulates the commit index, committed entries and
239/// messages that are ready to be applied or be sent to other peers.
240#[derive(Default, Debug, PartialEq)]
241pub struct LightReady {
242    commit_index: Option<u64>,
243    committed_entries: Vec<Entry>,
244    messages: Vec<Message>,
245}
246
247impl LightReady {
248    /// The current commit index.
249    /// It will be None state if there is no update.
250    /// It is not required to save it to stable storage.
251    #[inline]
252    pub fn commit_index(&self) -> Option<u64> {
253        self.commit_index
254    }
255
256    /// CommittedEntries specifies entries to be committed to a
257    /// store/state-machine. These have previously been committed to stable
258    /// store.
259    #[inline]
260    pub fn committed_entries(&self) -> &Vec<Entry> {
261        &self.committed_entries
262    }
263
264    /// Take the CommittedEntries.
265    #[inline]
266    pub fn take_committed_entries(&mut self) -> Vec<Entry> {
267        mem::take(&mut self.committed_entries)
268    }
269
270    /// Messages specifies outbound messages to be sent.
271    #[inline]
272    pub fn messages(&self) -> &[Message] {
273        &self.messages
274    }
275
276    /// Take the Messages.
277    #[inline]
278    pub fn take_messages(&mut self) -> Vec<Message> {
279        mem::take(&mut self.messages)
280    }
281}
282
283/// RawNode is a thread-unsafe Node.
284/// The methods of this struct correspond to the methods of Node and are described
285/// more fully there.
286pub struct RawNode<T: Storage> {
287    /// The internal raft state.
288    pub raft: Raft<T>,
289    prev_ss: SoftState,
290    prev_hs: HardState,
291    // Current max number of Record and ReadyRecord.
292    max_number: u64,
293    records: VecDeque<ReadyRecord>,
294    // Index which the given committed entries should start from.
295    commit_since_index: u64,
296}
297
298impl<T: Storage> RawNode<T> {
299    #[allow(clippy::new_ret_no_self)]
300    /// Create a new RawNode given some [`Config`].
301    pub fn new(config: &Config, store: T, logger: Arc<dyn Logger>) -> Result<Self> {
302        assert_ne!(config.id, 0, "config.id must not be zero");
303        let r = Raft::new(config, store, logger)?;
304        let mut rn = RawNode {
305            raft: r,
306            prev_hs: Default::default(),
307            prev_ss: Default::default(),
308            max_number: 0,
309            records: VecDeque::new(),
310            commit_since_index: config.applied,
311        };
312        rn.prev_hs = rn.raft.hard_state();
313        rn.prev_ss = rn.raft.soft_state();
314        rn.raft
315            .logger
316            .info(format!("RawNode created with id {}.", rn.raft.id).as_str());
317        Ok(rn)
318    }
319
320    /// Create a new RawNode given some [`Config`] and the default logger.
321    ///
322    /// The default logger is an `slog` to `log` adapter.
323    #[cfg(feature = "default-logger")]
324    #[allow(clippy::new_ret_no_self)]
325    pub fn with_default_logger(c: &Config, store: T) -> Result<Self> {
326        use crate::logger::Slogger;
327
328        Self::new(
329            c,
330            store,
331            Arc::new(Slogger {
332                slog: crate::default_logger(),
333            }),
334        )
335    }
336
337    /// Sets priority of node.
338    #[inline]
339    pub fn set_priority(&mut self, priority: i64) {
340        self.raft.set_priority(priority);
341    }
342
343    /// Tick advances the internal logical clock by a single tick.
344    ///
345    /// Returns true to indicate that there will probably be some readiness which
346    /// needs to be handled.
347    pub fn tick(&mut self) -> bool {
348        self.raft.tick()
349    }
350
351    /// Campaign causes this RawNode to transition to candidate state.
352    pub fn campaign(&mut self) -> Result<()> {
353        let mut m = Message::default();
354        m.set_msg_type(MessageType::MsgHup);
355        self.raft.step(m)
356    }
357
358    /// Propose proposes data to be appended to the raft log.
359    pub fn propose(&mut self, context: Vec<u8>, data: Vec<u8>) -> Result<()> {
360        let mut m = Message::default();
361        m.set_msg_type(MessageType::MsgPropose);
362        m.from = self.raft.id;
363        let mut e = Entry::default();
364        e.data = data.into();
365        e.context = context.into();
366        m.set_entries(vec![e].into());
367        self.raft.step(m)
368    }
369
370    /// Broadcast heartbeats to all the followers.
371    ///
372    /// If it's not leader, nothing will happen.
373    pub fn ping(&mut self) {
374        self.raft.ping()
375    }
376
377    /// ProposeConfChange proposes a config change.
378    ///
379    /// If the node enters joint state with `auto_leave` set to true, it's
380    /// caller's responsibility to propose an empty conf change again to force
381    /// leaving joint state.
382    pub fn propose_conf_change(&mut self, context: Vec<u8>, cc: impl ConfChangeI) -> Result<()> {
383        let (data, ty) = if let Some(cc) = cc.as_v1() {
384            (cc.write_to_bytes()?, EntryType::EntryConfChange)
385        } else {
386            (cc.as_v2().write_to_bytes()?, EntryType::EntryConfChangeV2)
387        };
388        let mut m = Message::default();
389        m.set_msg_type(MessageType::MsgPropose);
390        let mut e = Entry::default();
391        e.set_entry_type(ty);
392        e.data = data.into();
393        e.context = context.into();
394        m.set_entries(vec![e].into());
395        self.raft.step(m)
396    }
397
398    /// Applies a config change to the local node. The app must call this when it
399    /// applies a configuration change, except when it decides to reject the
400    /// configuration change, in which case no call must take place.
401    pub fn apply_conf_change(&mut self, cc: &impl ConfChangeI) -> Result<ConfState> {
402        self.raft.apply_conf_change(&cc.as_v2())
403    }
404
405    /// Step advances the state machine using the given message.
406    pub fn step(&mut self, m: Message) -> Result<()> {
407        // Ignore unexpected local messages receiving over network
408        if is_local_msg(m.get_msg_type()) {
409            return Err(Error::StepLocalMsg);
410        }
411        if self.raft.prs().get(m.from).is_some() || !is_response_msg(m.get_msg_type()) {
412            return self.raft.step(m);
413        }
414        Err(Error::StepPeerNotFound)
415    }
416
417    /// A callback when entries are fetched asynchronously.
418    /// The context should provide the context passed from Storage.entries().
419    /// See more in the comment of Storage.entries().
420    ///
421    /// # Panics
422    ///
423    /// Panics if passed with the context of context.can_async() == false
424    pub fn on_entries_fetched(&mut self, context: GetEntriesContext) {
425        match context.0 {
426            GetEntriesFor::SendAppend {
427                to,
428                term,
429                aggressively,
430            } => {
431                if self.raft.term != term || self.raft.state != StateRole::Leader {
432                    // term or leadership has changed
433                    return;
434                }
435                if self.raft.prs().get(to).is_none() {
436                    // the peer has been removed, do nothing
437                    return;
438                }
439
440                if aggressively {
441                    self.raft.send_append_aggressively(to)
442                } else {
443                    self.raft.send_append(to)
444                }
445            }
446            GetEntriesFor::Empty(can_async) if can_async => {}
447            _ => panic!("shouldn't call callback on non-async context"),
448        }
449    }
450
451    /// Generates a LightReady that has the committed entries and messages but no commit index.
452    fn gen_light_ready(&mut self) -> LightReady {
453        let mut rd = LightReady::default();
454        let max_size = Some(self.raft.max_committed_size_per_ready);
455        let raft = &mut self.raft;
456        rd.committed_entries = raft
457            .raft_log
458            .next_entries_since(self.commit_since_index, max_size)
459            .unwrap_or_default();
460        // Update raft uncommitted entries size
461        raft.reduce_uncommitted_size(&rd.committed_entries);
462        if let Some(e) = rd.committed_entries.last() {
463            assert!(self.commit_since_index < e.get_index());
464            self.commit_since_index = e.get_index();
465        }
466
467        if !raft.msgs.is_empty() {
468            rd.messages = mem::take(&mut raft.msgs);
469        }
470
471        rd
472    }
473
474    /// Returns the outstanding work that the application needs to handle.
475    ///
476    /// This includes appending and applying entries or a snapshot, updating the HardState,
477    /// and sending messages. The returned `Ready` *MUST* be handled and subsequently
478    /// passed back via `advance` or its families. Before that, *DO NOT* call any function like
479    /// `step`, `propose`, `campaign` to change internal state.
480    ///
481    /// [`Self::has_ready`] should be called first to check if it's necessary to handle the ready.
482    pub fn ready(&mut self) -> Ready {
483        let raft = &mut self.raft;
484
485        self.max_number += 1;
486        let mut rd = Ready {
487            number: self.max_number,
488            ..Default::default()
489        };
490        let mut rd_record = ReadyRecord {
491            number: self.max_number,
492            ..Default::default()
493        };
494
495        if self.prev_ss.raft_state != StateRole::Leader && raft.state == StateRole::Leader {
496            // The vote msg which makes this peer become leader has been sent after persisting.
497            // So the remaining records must be generated during being candidate which can not
498            // have last_entry and snapshot(if so, it should become follower).
499            for record in self.records.drain(..) {
500                assert_eq!(record.last_entry, None);
501                assert_eq!(record.snapshot, None);
502            }
503        }
504
505        let ss = raft.soft_state();
506        if ss != self.prev_ss {
507            rd.ss = Some(ss);
508        }
509        let hs = raft.hard_state();
510        if hs != self.prev_hs {
511            if hs.vote != self.prev_hs.vote || hs.term != self.prev_hs.term {
512                rd.must_sync = true;
513            }
514            rd.hs = Some(hs);
515        }
516
517        if !raft.read_states.is_empty() {
518            rd.read_states = mem::take(&mut raft.read_states);
519        }
520
521        if let Some(snapshot) = &raft.raft_log.unstable_snapshot() {
522            rd.snapshot = snapshot.clone();
523            assert!(self.commit_since_index <= rd.snapshot.get_metadata().index);
524            self.commit_since_index = rd.snapshot.get_metadata().index;
525            // If there is a snapshot, the latter entries can not be persisted
526            // so there is no committed entries.
527            assert!(
528                !raft
529                    .raft_log
530                    .has_next_entries_since(self.commit_since_index),
531                "has snapshot but also has committed entries since {}",
532                self.commit_since_index
533            );
534            rd_record.snapshot = Some((
535                rd.snapshot.get_metadata().index,
536                rd.snapshot.get_metadata().term,
537            ));
538            rd.must_sync = true;
539        }
540
541        rd.entries = raft.raft_log.unstable_entries().to_vec();
542        if let Some(e) = rd.entries.last() {
543            // If the last entry exists, the entries must not empty, vice versa.
544            rd.must_sync = true;
545            rd_record.last_entry = Some((e.get_index(), e.get_term()));
546        }
547
548        // Leader can send messages immediately to make replication concurrently.
549        // For more details, check raft thesis 10.2.1.
550        rd.is_persisted_msg = raft.state != StateRole::Leader;
551        rd.light = self.gen_light_ready();
552        self.records.push_back(rd_record);
553        rd
554    }
555
556    /// HasReady called when RawNode user need to check if any Ready pending.
557    pub fn has_ready(&self) -> bool {
558        let raft = &self.raft;
559        if !raft.msgs.is_empty() {
560            return true;
561        }
562
563        if raft.soft_state() != self.prev_ss {
564            return true;
565        }
566        if raft.hard_state() != self.prev_hs {
567            return true;
568        }
569
570        if !raft.read_states.is_empty() {
571            return true;
572        }
573
574        if !raft.raft_log.unstable_entries().is_empty() {
575            return true;
576        }
577
578        if self.snap().map_or(false, |s| !s.is_empty()) {
579            return true;
580        }
581
582        if raft
583            .raft_log
584            .has_next_entries_since(self.commit_since_index)
585        {
586            return true;
587        }
588
589        false
590    }
591
592    fn commit_ready(&mut self, rd: Ready) {
593        if let Some(ss) = rd.ss {
594            self.prev_ss = ss;
595        }
596        if let Some(hs) = rd.hs {
597            self.prev_hs = hs;
598        }
599        let rd_record = self.records.back().unwrap();
600        assert!(rd_record.number == rd.number);
601        let raft = &mut self.raft;
602        if let Some((index, _)) = rd_record.snapshot {
603            raft.raft_log.stable_snap(index);
604        }
605        if let Some((index, term)) = rd_record.last_entry {
606            raft.raft_log.stable_entries(index, term);
607        }
608    }
609
610    fn commit_apply(&mut self, applied: u64) {
611        self.raft.commit_apply(applied);
612    }
613
614    /// Notifies that the ready of this number has been persisted.
615    ///
616    /// Since Ready must be persisted in order, calling this function implicitly means
617    /// all readies with numbers smaller than this one have been persisted.
618    ///
619    /// [`Self::has_ready`] and [`Self::ready`] should be called later to handle further
620    /// updates that become valid after ready being persisted.
621    pub fn on_persist_ready(&mut self, number: u64) {
622        let (mut index, mut term) = (0, 0);
623        let mut snap_index = 0;
624        while let Some(record) = self.records.front() {
625            if record.number > number {
626                break;
627            }
628            let record = self.records.pop_front().unwrap();
629
630            if let Some((i, _)) = record.snapshot {
631                snap_index = i;
632                index = 0;
633                term = 0;
634            }
635
636            if let Some((i, t)) = record.last_entry {
637                index = i;
638                term = t;
639            }
640        }
641        if snap_index != 0 {
642            self.raft.on_persist_snap(snap_index);
643        }
644        if index != 0 {
645            self.raft.on_persist_entries(index, term);
646        }
647    }
648
649    /// Advances the ready after fully processing it.
650    ///
651    /// Fully processing a ready requires to persist snapshot, entries and hard states, apply all
652    /// committed entries, send all messages.
653    ///
654    /// Returns the LightReady that contains commit index, committed entries and messages. [`LightReady`]
655    /// contains updates that are only valid after persisting last ready. It should also be fully processed.
656    /// Then [`Self::advance_apply`] or [`Self::advance_apply_to`] should be used later to update applying
657    /// progress.
658    pub fn advance(&mut self, rd: Ready) -> LightReady {
659        let applied = self.commit_since_index;
660        let light_rd = self.advance_append(rd);
661        self.advance_apply_to(applied);
662        light_rd
663    }
664
665    /// Advances the ready without applying committed entries. [`Self::advance_apply`] or
666    /// [`Self::advance_apply_to`] should be used later to update applying progress.
667    ///
668    /// Returns the LightReady that contains commit index, committed entries and messages.
669    ///
670    /// Since Ready must be persisted in order, calling this function implicitly means
671    /// all ready collected before have been persisted.
672    #[inline]
673    pub fn advance_append(&mut self, rd: Ready) -> LightReady {
674        self.commit_ready(rd);
675        self.on_persist_ready(self.max_number);
676        let mut light_rd = self.gen_light_ready();
677        if self.raft.state != StateRole::Leader && !light_rd.messages().is_empty() {
678            self.raft
679                .logger
680                .fatal("not leader but has new msg after advance");
681        }
682        // Set commit index if it's updated
683        let hard_state = self.raft.hard_state();
684        if hard_state.commit > self.prev_hs.commit {
685            light_rd.commit_index = Some(hard_state.commit);
686            self.prev_hs.commit = hard_state.commit;
687        } else {
688            assert!(hard_state.commit == self.prev_hs.commit);
689            light_rd.commit_index = None;
690        }
691        assert_eq!(hard_state, self.prev_hs, "hard state != prev_hs");
692        light_rd
693    }
694
695    /// Same as [`Self::advance_append`] except that it allows to only store the updates in cache.
696    /// [`Self::on_persist_ready`] should be used later to update the persisting progress.
697    ///
698    /// Raft works on an assumption persisted updates should not be lost, which usually requires expensive
699    /// operations like `fsync`. `advance_append_async` allows you to control the rate of such operations and
700    /// get a reasonable batch size. However, it's still required that the updates can be read by raft from the
701    /// `Storage` trait before calling `advance_append_async`.
702    #[inline]
703    pub fn advance_append_async(&mut self, rd: Ready) {
704        self.commit_ready(rd);
705    }
706
707    /// Advance apply to the index of the last committed entries given before.
708    #[inline]
709    pub fn advance_apply(&mut self) {
710        self.commit_apply(self.commit_since_index);
711    }
712
713    /// Advance apply to the passed index.
714    #[inline]
715    pub fn advance_apply_to(&mut self, applied: u64) {
716        self.commit_apply(applied);
717    }
718
719    /// Grabs the snapshot from the raft if available.
720    #[inline]
721    pub fn snap(&self) -> Option<&Snapshot> {
722        self.raft.snap()
723    }
724
725    /// Status returns the current status of the given group.
726    #[inline]
727    pub fn status(&self) -> Status {
728        Status::new(&self.raft)
729    }
730
731    /// ReportUnreachable reports the given node is not reachable for the last send.
732    pub fn report_unreachable(&mut self, id: u64) {
733        let mut m = Message::default();
734        m.set_msg_type(MessageType::MsgUnreachable);
735        m.from = id;
736        // we don't care if it is ok actually
737        let _ = self.raft.step(m);
738    }
739
740    /// ReportSnapshot reports the status of the sent snapshot.
741    pub fn report_snapshot(&mut self, id: u64, status: SnapshotStatus) {
742        let rej = status == SnapshotStatus::Failure;
743        let mut m = Message::default();
744        m.set_msg_type(MessageType::MsgSnapStatus);
745        m.from = id;
746        m.reject = rej;
747        // we don't care if it is ok actually
748        let _ = self.raft.step(m);
749    }
750
751    /// Request a snapshot from a leader.
752    /// The snapshot's index must be greater or equal to the request_index (last_index) or
753    /// the leader's term must be greater than the request term (last_index's term).
754    pub fn request_snapshot(&mut self) -> Result<()> {
755        self.raft.request_snapshot()
756    }
757
758    /// TransferLeader tries to transfer leadership to the given transferee.
759    pub fn transfer_leader(&mut self, transferee: u64) {
760        let mut m = Message::default();
761        m.set_msg_type(MessageType::MsgTransferLeader);
762        m.from = transferee;
763        let _ = self.raft.step(m);
764    }
765
766    /// ReadIndex requests a read state. The read state will be set in ready.
767    /// Read State has a read index. Once the application advances further than the read
768    /// index, any linearizable read requests issued before the read request can be
769    /// processed safely. The read state will have the same rctx attached.
770    pub fn read_index(&mut self, rctx: Vec<u8>) {
771        let mut m = Message::default();
772        m.set_msg_type(MessageType::MsgReadIndex);
773        let mut e = Entry::default();
774        e.data = rctx.into();
775        m.set_entries(vec![e].into());
776        let _ = self.raft.step(m);
777    }
778
779    /// Returns the store as an immutable reference.
780    #[inline]
781    pub fn store(&self) -> &T {
782        self.raft.store()
783    }
784
785    /// Returns the store as a mutable reference.
786    #[inline]
787    pub fn mut_store(&mut self) -> &mut T {
788        self.raft.mut_store()
789    }
790
791    /// Set whether skip broadcast empty commit messages at runtime.
792    #[inline]
793    pub fn skip_bcast_commit(&mut self, skip: bool) {
794        self.raft.skip_bcast_commit(skip)
795    }
796
797    /// Set whether to batch append msg at runtime.
798    #[inline]
799    pub fn set_batch_append(&mut self, batch_append: bool) {
800        self.raft.set_batch_append(batch_append)
801    }
802}
803
804#[cfg(test)]
805mod test {
806    use crate::eraftpb::MessageType;
807
808    use super::is_local_msg;
809
810    #[test]
811    fn test_is_local_msg() {
812        let tests = vec![
813            (MessageType::MsgHup, true),
814            (MessageType::MsgBeat, true),
815            (MessageType::MsgUnreachable, true),
816            (MessageType::MsgSnapStatus, true),
817            (MessageType::MsgCheckQuorum, true),
818            (MessageType::MsgPropose, false),
819            (MessageType::MsgAppend, false),
820            (MessageType::MsgAppendResponse, false),
821            (MessageType::MsgRequestVote, false),
822            (MessageType::MsgRequestVoteResponse, false),
823            (MessageType::MsgSnapshot, false),
824            (MessageType::MsgHeartbeat, false),
825            (MessageType::MsgHeartbeatResponse, false),
826            (MessageType::MsgTransferLeader, false),
827            (MessageType::MsgTimeoutNow, false),
828            (MessageType::MsgReadIndex, false),
829            (MessageType::MsgReadIndexResp, false),
830            (MessageType::MsgRequestPreVote, false),
831            (MessageType::MsgRequestPreVoteResponse, false),
832        ];
833        for (msg_type, result) in tests {
834            assert_eq!(is_local_msg(msg_type), result);
835        }
836    }
837}