mephisto_raft/
lib.rs

1// Copyright 2023 CratesLand Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! ## Creating a Raft node
16//!
17//! You can use [`RawNode::new`] to create the Raft node. To create the Raft node, you need to
18//! provide a [`Storage`] component, and a [`Config`] to the [`RawNode::new`] function.
19//!
20//! ```rust
21//! use mephisto_raft::{raw_node::RawNode, storage::MemStorage, Config};
22//!
23//! // Select some defaults, then change what we need.
24//! let config = Config {
25//!     id: 1,
26//!     ..Default::default()
27//! };
28//! // ... Make any configuration changes.
29//! // After, make sure it's valid!
30//! config.validate().unwrap();
31//! // We'll use the built-in `MemStorage`, but you will likely want your own.
32//! // Finally, create our Raft node!
33//! let storage = MemStorage::new_with_conf_state((vec![1], vec![]));
34//! let mut node = RawNode::new(&config, storage).unwrap();
35//! ```
36//!
37//! ## Ticking the Raft node
38//!
39//! Use a timer to tick the Raft node at regular intervals. See the following example using Rust
40//! channel `recv_timeout` to drive the Raft node at least every 100ms, calling
41//! [`tick()`](RawNode::tick) each time.
42//!
43//! ```rust
44//! # use mephisto_raft::{Config, storage::MemStorage, raw_node::RawNode};
45//! # let config = Config { id: 1, ..Default::default() };
46//! # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
47//! # let mut node = RawNode::new(&config, store).unwrap();
48//! # node.raft.become_candidate();
49//! # node.raft.become_leader();
50//! use std::{
51//!     sync::mpsc::{channel, RecvTimeoutError},
52//!     time::{Duration, Instant},
53//! };
54//!
55//! // We're using a channel, but this could be any stream of events.
56//! let (tx, rx) = channel();
57//! let timeout = Duration::from_millis(100);
58//! let mut remaining_timeout = timeout;
59//!
60//! // Send the `tx` somewhere else...
61//!
62//! loop {
63//!     let now = Instant::now();
64//!
65//!     match rx.recv_timeout(remaining_timeout) {
66//!         Ok(()) => {
67//!             // Let's save this for later.
68//!             unimplemented!()
69//!         }
70//!         Err(RecvTimeoutError::Timeout) => (),
71//!         Err(RecvTimeoutError::Disconnected) => unimplemented!(),
72//!     }
73//!
74//!     let elapsed = now.elapsed();
75//!     if elapsed >= remaining_timeout {
76//!         remaining_timeout = timeout;
77//!         // We drive Raft every 100ms.
78//!         node.tick();
79//!     } else {
80//!         remaining_timeout -= elapsed;
81//!     }
82//! #    break;
83//! }
84//! ```
85//!
86//! ## Proposing to, and stepping the Raft node
87//!
88//! Using the `propose` function you can drive the Raft node when the client sends a request to the
89//! Raft server. You can call `propose` to add the request to the Raft log explicitly.
90//!
91//! In most cases, the client needs to wait for a response for the request. For example, if the
92//! client writes a value to a key and wants to know whether the write succeeds or not, but the
93//! write flow is asynchronous in Raft, so the write log entry must be replicated to other
94//! followers, then committed and at last applied to the state machine, so here we need a way to
95//! notify the client after the write is finished.
96//!
97//! One simple way is to use a unique ID for the client request, and save the associated callback
98//! function in a hash map. When the log entry is applied, we can get the ID from the decoded entry,
99//! call the corresponding callback, and notify the client.
100//!
101//! You can call the `step` function when you receive the Raft messages from other nodes.
102//!
103//! Here is a simple example to use `propose` and `step`:
104//!
105//! ```rust
106//! # use mephisto_raft::{Config, storage::MemStorage, raw_node::RawNode, eraftpb::Message};
107//! # use std::{
108//! #     sync::mpsc::{channel, RecvTimeoutError},
109//! #     time::{Instant, Duration},
110//! #     collections::HashMap
111//! # };
112//! #
113//! # let config = Config { id: 1, ..Default::default() };
114//! # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
115//! # let mut node = RawNode::new(&config, store).unwrap();
116//! # node.raft.become_candidate();
117//! # node.raft.become_leader();
118//! #
119//! # let (tx, rx) = channel();
120//! # let timeout = Duration::from_millis(100);
121//! # let mut remaining_timeout = timeout;
122//! #
123//! enum Msg {
124//!     Propose {
125//!         id: u8,
126//!         callback: Box<dyn Fn() + Send>,
127//!     },
128//!     Raft(Message),
129//! }
130//!
131//! // Simulate a message coming down the stream.
132//! tx.send(Msg::Propose {
133//!     id: 1,
134//!     callback: Box::new(|| ()),
135//! })
136//! .unwrap();
137//!
138//! let mut cbs = HashMap::new();
139//! loop {
140//!     let now = Instant::now();
141//!
142//!     match rx.recv_timeout(remaining_timeout) {
143//!         Ok(Msg::Propose { id, callback }) => {
144//!             cbs.insert(id, callback);
145//!             node.propose(vec![], vec![id]).unwrap();
146//!         }
147//!         Ok(Msg::Raft(m)) => node.step(m).unwrap(),
148//!         Err(RecvTimeoutError::Timeout) => (),
149//!         Err(RecvTimeoutError::Disconnected) => unimplemented!(),
150//!     }
151//!
152//!     let elapsed = now.elapsed();
153//!     if elapsed >= remaining_timeout {
154//!         remaining_timeout = timeout;
155//!         // We drive Raft every 100ms.
156//!         node.tick();
157//!     } else {
158//!         remaining_timeout -= elapsed;
159//!     }
160//!     break;
161//! }
162//! ```
163//!
164//! In the above example, we use a channel to receive the `propose` and `step` messages. We only
165//! propose the request ID to the Raft log. In your own practice, you can embed the ID in your
166//! request and propose the encoded binary request data.
167//!
168//! ## Processing the `Ready` State
169//!
170//! When your Raft node is ticked and running, Raft should enter a `Ready` state. You need to first
171//! use `has_ready` to check whether Raft is ready. If yes, use the `ready` function to get a
172//! `Ready` state:
173//!
174//! ```rust
175//! # use mephisto_raft::{Config, storage::MemStorage, raw_node::RawNode};
176//! #
177//! # let config = Config { id: 1, ..Default::default() };
178//! # config.validate().unwrap();
179//! # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
180//! # let mut node = RawNode::new(&config, store).unwrap();
181//! #
182//! if !node.has_ready() {
183//!     return;
184//! }
185//!
186//! // The Raft is ready, we can do something now.
187//! let mut ready = node.ready();
188//! ```
189//!
190//! The `Ready` state contains quite a bit of information, and you need to check and process them
191//! one by one:
192//!
193//! 1. Check whether `messages` is empty or not. If not, it means that the node will send messages
194//! to other nodes:
195//!
196//!     ```rust
197//!     # use mephisto_raft::{Config, storage::MemStorage, raw_node::RawNode, StateRole};
198//!     #
199//!     # let config = Config { id: 1, ..Default::default() };
200//!     # config.validate().unwrap();
201//!     # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
202//!     # let mut node = RawNode::new(&config, store).unwrap();
203//!     #
204//!     # if !node.has_ready() {
205//!     #   return;
206//!     # }
207//!     # let mut ready = node.ready();
208//!     #
209//!     if !ready.messages().is_empty() {
210//!         for msg in ready.take_messages() {
211//!             // Send messages to other peers.
212//!         }
213//!     }
214//!     ```
215//!
216//! 2. Check whether `snapshot` is empty or not. If not empty, it means that the Raft node has
217//! received a Raft snapshot from the leader and we must apply the snapshot:
218//!
219//!     ```rust
220//!     # use mephisto_raft::{Config, storage::MemStorage, raw_node::RawNode};
221//!     #
222//!     # let config = Config { id: 1, ..Default::default() };
223//!     # config.validate().unwrap();
224//!     # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
225//!     # let mut node = RawNode::new(&config, store).unwrap();
226//!     #
227//!     # if !node.has_ready() {
228//!     #   return;
229//!     # }
230//!     # let mut ready = node.ready();
231//!     #
232//!     if !ready.snapshot().is_empty() {
233//!         // This is a snapshot, we need to apply the snapshot at first.
234//!         node.mut_store()
235//!             .wl()
236//!             .apply_snapshot(ready.snapshot().clone())
237//!             .unwrap();
238//!     }
239//!
240//!     ```
241//!
242//! 3. Check whether `committed_entries` is empty or not. If not, it means that there are some newly
243//! committed log entries which you must apply to the state machine. Of course, after applying, you
244//! need to update the applied index and resume `apply` later:
245//!
246//!     ```rust
247//!     # use mephisto_raft::{Config, storage::MemStorage, raw_node::RawNode, eraftpb};
248//!     #
249//!     # let config = Config { id: 1, ..Default::default() };
250//!     # config.validate().unwrap();
251//!     # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
252//!     # let mut node = RawNode::new(&config, store).unwrap();
253//!     #
254//!     # if !node.has_ready() {
255//!     #   return;
256//!     # }
257//!     # let mut ready = node.ready();
258//!     #
259//!     # fn handle_conf_change(e: eraftpb::Entry) {
260//!     # }
261//!     #
262//!     # fn handle_normal(e: eraftpb::Entry) {
263//!     # }
264//!     #
265//!     let mut _last_apply_index = 0;
266//!     for entry in ready.take_committed_entries() {
267//!         // Mostly, you need to save the last apply index to resume applying
268//!         // after restart. Here we just ignore this because we use a Memory storage.
269//!         _last_apply_index = entry.index;
270//!
271//!         if entry.data.is_empty() {
272//!             // Emtpy entry, when the peer becomes Leader it will send an empty entry.
273//!             continue;
274//!         }
275//!
276//!         match entry.entry_type() {
277//!             eraftpb::EntryType::EntryNormal => handle_normal(entry),
278//!             eraftpb::EntryType::EntryConfChange => handle_conf_change(entry),
279//!         }
280//!     }
281//!     ```
282//!
283//!     Note, although Raft guarentees only persisted committed entries will be applied,
284//!     but it doesn't guarentee commit index is persisted before being applied. For example,
285//!     if application is restarted after applying committed entries before persisting
286//!     commit index, apply index can be larger than commit index and cause panic. To
287//!     solve the problem, persisting commit index with or before applying entries.
288//!     You can also always assign commit index to the `max(commit_index, applied_index)`
289//!     after restarting, *it may work but potential log loss may also be ignored silently*.
290//!
291//! 4. Check whether `entries` is empty or not. If not empty, it means that there are newly added
292//! entries but have not been committed yet, we must append the entries to the Raft log:
293//!
294//!     ```rust
295//!     # use mephisto_raft::{Config, storage::MemStorage, raw_node::RawNode};
296//!     #
297//!     # let config = Config { id: 1, ..Default::default() };
298//!     # config.validate().unwrap();
299//!     # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
300//!     # let mut node = RawNode::new(&config, store).unwrap();
301//!     #
302//!     # if !node.has_ready() {
303//!     #   return;
304//!     # }
305//!     # let mut ready = node.ready();
306//!     #
307//!     if !ready.entries().is_empty() {
308//!         // Append entries to the Raft log
309//!         node.mut_store().wl().append(ready.entries()).unwrap();
310//!     }
311//!
312//!     ```
313//!
314//! 5. Check whether `hs` is empty or not. If not empty, it means that the `HardState` of the node
315//! has changed. For example, the node may vote for a new leader, or the commit index has been
316//! increased. We must persist the changed `HardState`:
317//!
318//!     ```rust
319//!     # use mephisto_raft::{Config, storage::MemStorage, raw_node::RawNode};
320//!     #
321//!     # let config = Config { id: 1, ..Default::default() };
322//!     # config.validate().unwrap();
323//!     # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
324//!     # let mut node = RawNode::new(&config, store).unwrap();
325//!     #
326//!     # if !node.has_ready() {
327//!     #   return;
328//!     # }
329//!     # let mut ready = node.ready();
330//!     #
331//!     if let Some(hs) = ready.hs() {
332//!         // Raft HardState changed, and we need to persist it.
333//!         node.mut_store().wl().set_hard_state(hs.clone());
334//!     }
335//!     ```
336//!
337//! 6. Check whether `persisted_messages` is empty or not. If not, it means that the node will send
338//! messages to other nodes after persisting hardstate, entries and snapshot:
339//!
340//!     ```rust
341//!     # use mephisto_raft::{Config, storage::MemStorage, raw_node::RawNode, StateRole};
342//!     #
343//!     # let config = Config { id: 1, ..Default::default() };
344//!     # config.validate().unwrap();
345//!     # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
346//!     # let mut node = RawNode::new(&config, store).unwrap();
347//!     #
348//!     # if !node.has_ready() {
349//!     #   return;
350//!     # }
351//!     # let mut ready = node.ready();
352//!     #
353//!     if !ready.persisted_messages().is_empty() {
354//!         for msg in ready.take_persisted_messages() {
355//!             // Send persisted messages to other peers.
356//!         }
357//!     }
358//!     ```
359//!
360//! 7. Call `advance` to notify that the previous work is completed. Get the return value
361//! `LightReady` and handle its `messages` and `committed_entries` like step 1 and step 3 does. Then
362//! call `advance_apply` to advance the applied index inside.
363//!
364//!     ```rust
365//!     # use mephisto_raft::{Config, storage::MemStorage, raw_node::RawNode};
366//!     # use mephisto_raft::eraftpb::{EntryType, Entry, Message};
367//!     #
368//!     # let config = Config { id: 1, ..Default::default() };
369//!     # config.validate().unwrap();
370//!     # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
371//!     # let mut node = RawNode::new(&config, store).unwrap();
372//!     #
373//!     # if !node.has_ready() {
374//!     #   return;
375//!     # }
376//!     # let mut ready = node.ready();
377//!     #
378//!     # fn handle_messages(msgs: Vec<Message>) {
379//!     # }
380//!     #
381//!     # fn handle_committed_entries(committed_entries: Vec<Entry>) {
382//!     # }
383//!     let mut light_rd = node.advance(ready);
384//!     // Like step 1 and 3, you can use functions to make them behave the same.
385//!     handle_messages(light_rd.take_messages());
386//!     handle_committed_entries(light_rd.take_committed_entries());
387//!     node.advance_apply();
388//!     ```
389//!
390//! Sometimes it's better not to block the raft machine in IO operation, so that latency of
391//! read/write can be more predictable and the fsync frequency can be controlled. The crate
392//! supports async ready to offload the IO operation to other thread. The usage is the same as
393//! above except:
394//! 1. All writes are not required to be persisted immediately, they can be written into memory
395//! caches; 2. Persisted messages should be sent after all corresponding writes are persisted;
396//! 3. [`advance_append_async`](RawNode::advance_append_async) is used when all writes are finished
397//!    instead of `advance/advance_append`.
398//! 4. Only persisted entries can be committed and applied, so to make progress, all writes should
399//!    be persisted at some point.
400//!
401//! ## Arbitrary Membership Changes
402//!
403//! When building a resilient, scalable distributed system there is a strong need to be able to
404//! change the membership of a peer group *dynamically, without downtime.* This Raft crate supports
405//! this via **Joint Consensus**
406//! ([Raft paper, section 6](https://web.stanford.edu/~ouster/cgi-bin/papers/raft-atc14)).
407//!
408//! It permits resilient arbitrary dynamic membership changes. A membership change can do any or all
409//! of the following:
410//!
411//! * Add peer (learner or voter) *n* to the group.
412//! * Remove a learner *n* from the group.
413//! * Promote a learner to a voter.
414//! * Demote a voter back to learner.
415//! * Replace a node *n* with another node *m*.
416//!
417//! For example to promote a learner 4 and demote an existing voter 3:
418//! ```no_run
419//! # use mephisto_raft::{Config, eraftpb::*, proto, raw_node::RawNode, storage::MemStorage};
420//! #
421//! # let mut config = Config { id: 1, ..Default::default() };
422//! # let store = MemStorage::new_with_conf_state((vec![1, 2], vec![]));
423//! # let mut node = RawNode::new(&mut config, store).unwrap();
424//! let steps = vec![
425//!     proto::new_conf_change_single(4, ConfChangeType::AddNode),
426//!     proto::new_conf_change_single(3, ConfChangeType::RemoveNode),
427//! ];
428//! let mut cc = ConfChange {
429//!     changes: steps,
430//!     ..Default::default()
431//! };
432//! node.propose_conf_change(vec![], cc).unwrap();
433//! // After the log is committed and applied
434//! // node.apply_conf_change(&cc).unwrap();
435//! ```
436//!
437//! This process is a two-phase process, during the midst of it the peer group's leader is managing
438//! **two independent, possibly overlapping peer sets**.
439//!
440//! > **Note:** In order to maintain resiliency guarantees  (progress while a majority of both peer
441//! > sets is
442//! active), it is recommended to wait until the entire peer group has exited the transition phase
443//! before taking old, removed peers offline.
444
445#![feature(panic_update_hook)]
446#![deny(clippy::all)]
447#![deny(missing_docs)]
448
449pub use confchange::{Changer, MapChange};
450pub use config::Config;
451pub use errors::{Error, Result, StorageError};
452pub use log_unstable::Unstable;
453pub use proto::eraftpb;
454pub use quorum::{joint::Configuration as JointConfig, majority::Configuration as MajorityConfig};
455pub use raft_log::{RaftLog, NO_LIMIT};
456pub use raw_node::{LightReady, Peer, RawNode, Ready, SnapshotStatus};
457pub use read_only::{ReadOnlyOption, ReadState};
458pub use status::Status;
459pub use storage::{GetEntriesContext, RaftState, Storage};
460pub use tracker::{Inflights, Progress, ProgressState, ProgressTracker};
461pub use util::majority;
462
463pub use crate::raft::{
464    vote_resp_msg_type, Raft, SoftState, StateRole, CAMPAIGN_ELECTION, CAMPAIGN_PRE_ELECTION,
465    CAMPAIGN_TRANSFER, INVALID_ID, INVALID_INDEX,
466};
467
468mod confchange;
469mod config;
470mod errors;
471mod log_unstable;
472pub mod proto;
473mod quorum;
474#[cfg(test)]
475pub mod raft;
476#[cfg(not(test))]
477mod raft;
478mod raft_log;
479pub mod raw_node;
480mod read_only;
481mod status;
482pub mod storage;
483mod tracker;
484pub mod util;
485
486pub mod prelude {
487    //! A "prelude" for crates using the `raft` crate.
488    //!
489    //! This prelude is similar to the standard library's prelude in that you'll
490    //! almost always want to import its entire contents, but unlike the standard
491    //! library's prelude you'll have to do so manually:
492    //!
493    //! ```
494    //! use mephisto_raft::prelude::*;
495    //! ```
496    //!
497    //! The prelude may grow over time as additional items see ubiquitous use.
498
499    pub use crate::{
500        config::Config,
501        proto::eraftpb::{
502            ConfChange, ConfChangeSingle, ConfChangeTransition, ConfChangeType, ConfState, Entry,
503            EntryType, HardState, Message, MessageType, Snapshot, SnapshotMetadata,
504        },
505        raft::Raft,
506        raw_node::{Peer, RawNode, Ready, SnapshotStatus},
507        read_only::{ReadOnlyOption, ReadState},
508        status::Status,
509        storage::{RaftState, Storage},
510        Progress,
511    };
512}
513
514type DefaultHashBuilder = std::hash::BuildHasherDefault<fxhash::FxHasher>;
515type HashMap<K, V> = std::collections::HashMap<K, V, DefaultHashBuilder>;
516type HashSet<K> = std::collections::HashSet<K, DefaultHashBuilder>;
517
518/// Logging the panic info before panicking.
519#[macro_export]
520macro_rules! fatal {
521    ($($arg:tt)*) => {{
522        let prev = std::panic::take_hook();
523        std::panic::set_hook(Box::new(move |info| {
524            tracing::error!("{info}");
525            prev(info);
526        }));
527        panic!($($arg)*);
528    }};
529}