mephisto_raft/lib.rs
1// Copyright 2023 CratesLand Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! ## Creating a Raft node
16//!
17//! You can use [`RawNode::new`] to create the Raft node. To create the Raft node, you need to
18//! provide a [`Storage`] component, and a [`Config`] to the [`RawNode::new`] function.
19//!
20//! ```rust
21//! use mephisto_raft::{raw_node::RawNode, storage::MemStorage, Config};
22//!
23//! // Select some defaults, then change what we need.
24//! let config = Config {
25//! id: 1,
26//! ..Default::default()
27//! };
28//! // ... Make any configuration changes.
29//! // After, make sure it's valid!
30//! config.validate().unwrap();
31//! // We'll use the built-in `MemStorage`, but you will likely want your own.
32//! // Finally, create our Raft node!
33//! let storage = MemStorage::new_with_conf_state((vec![1], vec![]));
34//! let mut node = RawNode::new(&config, storage).unwrap();
35//! ```
36//!
37//! ## Ticking the Raft node
38//!
39//! Use a timer to tick the Raft node at regular intervals. See the following example using Rust
40//! channel `recv_timeout` to drive the Raft node at least every 100ms, calling
41//! [`tick()`](RawNode::tick) each time.
42//!
43//! ```rust
44//! # use mephisto_raft::{Config, storage::MemStorage, raw_node::RawNode};
45//! # let config = Config { id: 1, ..Default::default() };
46//! # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
47//! # let mut node = RawNode::new(&config, store).unwrap();
48//! # node.raft.become_candidate();
49//! # node.raft.become_leader();
50//! use std::{
51//! sync::mpsc::{channel, RecvTimeoutError},
52//! time::{Duration, Instant},
53//! };
54//!
55//! // We're using a channel, but this could be any stream of events.
56//! let (tx, rx) = channel();
57//! let timeout = Duration::from_millis(100);
58//! let mut remaining_timeout = timeout;
59//!
60//! // Send the `tx` somewhere else...
61//!
62//! loop {
63//! let now = Instant::now();
64//!
65//! match rx.recv_timeout(remaining_timeout) {
66//! Ok(()) => {
67//! // Let's save this for later.
68//! unimplemented!()
69//! }
70//! Err(RecvTimeoutError::Timeout) => (),
71//! Err(RecvTimeoutError::Disconnected) => unimplemented!(),
72//! }
73//!
74//! let elapsed = now.elapsed();
75//! if elapsed >= remaining_timeout {
76//! remaining_timeout = timeout;
77//! // We drive Raft every 100ms.
78//! node.tick();
79//! } else {
80//! remaining_timeout -= elapsed;
81//! }
82//! # break;
83//! }
84//! ```
85//!
86//! ## Proposing to, and stepping the Raft node
87//!
88//! Using the `propose` function you can drive the Raft node when the client sends a request to the
89//! Raft server. You can call `propose` to add the request to the Raft log explicitly.
90//!
91//! In most cases, the client needs to wait for a response for the request. For example, if the
92//! client writes a value to a key and wants to know whether the write succeeds or not, but the
93//! write flow is asynchronous in Raft, so the write log entry must be replicated to other
94//! followers, then committed and at last applied to the state machine, so here we need a way to
95//! notify the client after the write is finished.
96//!
97//! One simple way is to use a unique ID for the client request, and save the associated callback
98//! function in a hash map. When the log entry is applied, we can get the ID from the decoded entry,
99//! call the corresponding callback, and notify the client.
100//!
101//! You can call the `step` function when you receive the Raft messages from other nodes.
102//!
103//! Here is a simple example to use `propose` and `step`:
104//!
105//! ```rust
106//! # use mephisto_raft::{Config, storage::MemStorage, raw_node::RawNode, eraftpb::Message};
107//! # use std::{
108//! # sync::mpsc::{channel, RecvTimeoutError},
109//! # time::{Instant, Duration},
110//! # collections::HashMap
111//! # };
112//! #
113//! # let config = Config { id: 1, ..Default::default() };
114//! # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
115//! # let mut node = RawNode::new(&config, store).unwrap();
116//! # node.raft.become_candidate();
117//! # node.raft.become_leader();
118//! #
119//! # let (tx, rx) = channel();
120//! # let timeout = Duration::from_millis(100);
121//! # let mut remaining_timeout = timeout;
122//! #
123//! enum Msg {
124//! Propose {
125//! id: u8,
126//! callback: Box<dyn Fn() + Send>,
127//! },
128//! Raft(Message),
129//! }
130//!
131//! // Simulate a message coming down the stream.
132//! tx.send(Msg::Propose {
133//! id: 1,
134//! callback: Box::new(|| ()),
135//! })
136//! .unwrap();
137//!
138//! let mut cbs = HashMap::new();
139//! loop {
140//! let now = Instant::now();
141//!
142//! match rx.recv_timeout(remaining_timeout) {
143//! Ok(Msg::Propose { id, callback }) => {
144//! cbs.insert(id, callback);
145//! node.propose(vec![], vec![id]).unwrap();
146//! }
147//! Ok(Msg::Raft(m)) => node.step(m).unwrap(),
148//! Err(RecvTimeoutError::Timeout) => (),
149//! Err(RecvTimeoutError::Disconnected) => unimplemented!(),
150//! }
151//!
152//! let elapsed = now.elapsed();
153//! if elapsed >= remaining_timeout {
154//! remaining_timeout = timeout;
155//! // We drive Raft every 100ms.
156//! node.tick();
157//! } else {
158//! remaining_timeout -= elapsed;
159//! }
160//! break;
161//! }
162//! ```
163//!
164//! In the above example, we use a channel to receive the `propose` and `step` messages. We only
165//! propose the request ID to the Raft log. In your own practice, you can embed the ID in your
166//! request and propose the encoded binary request data.
167//!
168//! ## Processing the `Ready` State
169//!
170//! When your Raft node is ticked and running, Raft should enter a `Ready` state. You need to first
171//! use `has_ready` to check whether Raft is ready. If yes, use the `ready` function to get a
172//! `Ready` state:
173//!
174//! ```rust
175//! # use mephisto_raft::{Config, storage::MemStorage, raw_node::RawNode};
176//! #
177//! # let config = Config { id: 1, ..Default::default() };
178//! # config.validate().unwrap();
179//! # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
180//! # let mut node = RawNode::new(&config, store).unwrap();
181//! #
182//! if !node.has_ready() {
183//! return;
184//! }
185//!
186//! // The Raft is ready, we can do something now.
187//! let mut ready = node.ready();
188//! ```
189//!
190//! The `Ready` state contains quite a bit of information, and you need to check and process them
191//! one by one:
192//!
193//! 1. Check whether `messages` is empty or not. If not, it means that the node will send messages
194//! to other nodes:
195//!
196//! ```rust
197//! # use mephisto_raft::{Config, storage::MemStorage, raw_node::RawNode, StateRole};
198//! #
199//! # let config = Config { id: 1, ..Default::default() };
200//! # config.validate().unwrap();
201//! # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
202//! # let mut node = RawNode::new(&config, store).unwrap();
203//! #
204//! # if !node.has_ready() {
205//! # return;
206//! # }
207//! # let mut ready = node.ready();
208//! #
209//! if !ready.messages().is_empty() {
210//! for msg in ready.take_messages() {
211//! // Send messages to other peers.
212//! }
213//! }
214//! ```
215//!
216//! 2. Check whether `snapshot` is empty or not. If not empty, it means that the Raft node has
217//! received a Raft snapshot from the leader and we must apply the snapshot:
218//!
219//! ```rust
220//! # use mephisto_raft::{Config, storage::MemStorage, raw_node::RawNode};
221//! #
222//! # let config = Config { id: 1, ..Default::default() };
223//! # config.validate().unwrap();
224//! # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
225//! # let mut node = RawNode::new(&config, store).unwrap();
226//! #
227//! # if !node.has_ready() {
228//! # return;
229//! # }
230//! # let mut ready = node.ready();
231//! #
232//! if !ready.snapshot().is_empty() {
233//! // This is a snapshot, we need to apply the snapshot at first.
234//! node.mut_store()
235//! .wl()
236//! .apply_snapshot(ready.snapshot().clone())
237//! .unwrap();
238//! }
239//!
240//! ```
241//!
242//! 3. Check whether `committed_entries` is empty or not. If not, it means that there are some newly
243//! committed log entries which you must apply to the state machine. Of course, after applying, you
244//! need to update the applied index and resume `apply` later:
245//!
246//! ```rust
247//! # use mephisto_raft::{Config, storage::MemStorage, raw_node::RawNode, eraftpb};
248//! #
249//! # let config = Config { id: 1, ..Default::default() };
250//! # config.validate().unwrap();
251//! # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
252//! # let mut node = RawNode::new(&config, store).unwrap();
253//! #
254//! # if !node.has_ready() {
255//! # return;
256//! # }
257//! # let mut ready = node.ready();
258//! #
259//! # fn handle_conf_change(e: eraftpb::Entry) {
260//! # }
261//! #
262//! # fn handle_normal(e: eraftpb::Entry) {
263//! # }
264//! #
265//! let mut _last_apply_index = 0;
266//! for entry in ready.take_committed_entries() {
267//! // Mostly, you need to save the last apply index to resume applying
268//! // after restart. Here we just ignore this because we use a Memory storage.
269//! _last_apply_index = entry.index;
270//!
271//! if entry.data.is_empty() {
272//! // Emtpy entry, when the peer becomes Leader it will send an empty entry.
273//! continue;
274//! }
275//!
276//! match entry.entry_type() {
277//! eraftpb::EntryType::EntryNormal => handle_normal(entry),
278//! eraftpb::EntryType::EntryConfChange => handle_conf_change(entry),
279//! }
280//! }
281//! ```
282//!
283//! Note, although Raft guarentees only persisted committed entries will be applied,
284//! but it doesn't guarentee commit index is persisted before being applied. For example,
285//! if application is restarted after applying committed entries before persisting
286//! commit index, apply index can be larger than commit index and cause panic. To
287//! solve the problem, persisting commit index with or before applying entries.
288//! You can also always assign commit index to the `max(commit_index, applied_index)`
289//! after restarting, *it may work but potential log loss may also be ignored silently*.
290//!
291//! 4. Check whether `entries` is empty or not. If not empty, it means that there are newly added
292//! entries but have not been committed yet, we must append the entries to the Raft log:
293//!
294//! ```rust
295//! # use mephisto_raft::{Config, storage::MemStorage, raw_node::RawNode};
296//! #
297//! # let config = Config { id: 1, ..Default::default() };
298//! # config.validate().unwrap();
299//! # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
300//! # let mut node = RawNode::new(&config, store).unwrap();
301//! #
302//! # if !node.has_ready() {
303//! # return;
304//! # }
305//! # let mut ready = node.ready();
306//! #
307//! if !ready.entries().is_empty() {
308//! // Append entries to the Raft log
309//! node.mut_store().wl().append(ready.entries()).unwrap();
310//! }
311//!
312//! ```
313//!
314//! 5. Check whether `hs` is empty or not. If not empty, it means that the `HardState` of the node
315//! has changed. For example, the node may vote for a new leader, or the commit index has been
316//! increased. We must persist the changed `HardState`:
317//!
318//! ```rust
319//! # use mephisto_raft::{Config, storage::MemStorage, raw_node::RawNode};
320//! #
321//! # let config = Config { id: 1, ..Default::default() };
322//! # config.validate().unwrap();
323//! # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
324//! # let mut node = RawNode::new(&config, store).unwrap();
325//! #
326//! # if !node.has_ready() {
327//! # return;
328//! # }
329//! # let mut ready = node.ready();
330//! #
331//! if let Some(hs) = ready.hs() {
332//! // Raft HardState changed, and we need to persist it.
333//! node.mut_store().wl().set_hard_state(hs.clone());
334//! }
335//! ```
336//!
337//! 6. Check whether `persisted_messages` is empty or not. If not, it means that the node will send
338//! messages to other nodes after persisting hardstate, entries and snapshot:
339//!
340//! ```rust
341//! # use mephisto_raft::{Config, storage::MemStorage, raw_node::RawNode, StateRole};
342//! #
343//! # let config = Config { id: 1, ..Default::default() };
344//! # config.validate().unwrap();
345//! # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
346//! # let mut node = RawNode::new(&config, store).unwrap();
347//! #
348//! # if !node.has_ready() {
349//! # return;
350//! # }
351//! # let mut ready = node.ready();
352//! #
353//! if !ready.persisted_messages().is_empty() {
354//! for msg in ready.take_persisted_messages() {
355//! // Send persisted messages to other peers.
356//! }
357//! }
358//! ```
359//!
360//! 7. Call `advance` to notify that the previous work is completed. Get the return value
361//! `LightReady` and handle its `messages` and `committed_entries` like step 1 and step 3 does. Then
362//! call `advance_apply` to advance the applied index inside.
363//!
364//! ```rust
365//! # use mephisto_raft::{Config, storage::MemStorage, raw_node::RawNode};
366//! # use mephisto_raft::eraftpb::{EntryType, Entry, Message};
367//! #
368//! # let config = Config { id: 1, ..Default::default() };
369//! # config.validate().unwrap();
370//! # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
371//! # let mut node = RawNode::new(&config, store).unwrap();
372//! #
373//! # if !node.has_ready() {
374//! # return;
375//! # }
376//! # let mut ready = node.ready();
377//! #
378//! # fn handle_messages(msgs: Vec<Message>) {
379//! # }
380//! #
381//! # fn handle_committed_entries(committed_entries: Vec<Entry>) {
382//! # }
383//! let mut light_rd = node.advance(ready);
384//! // Like step 1 and 3, you can use functions to make them behave the same.
385//! handle_messages(light_rd.take_messages());
386//! handle_committed_entries(light_rd.take_committed_entries());
387//! node.advance_apply();
388//! ```
389//!
390//! Sometimes it's better not to block the raft machine in IO operation, so that latency of
391//! read/write can be more predictable and the fsync frequency can be controlled. The crate
392//! supports async ready to offload the IO operation to other thread. The usage is the same as
393//! above except:
394//! 1. All writes are not required to be persisted immediately, they can be written into memory
395//! caches; 2. Persisted messages should be sent after all corresponding writes are persisted;
396//! 3. [`advance_append_async`](RawNode::advance_append_async) is used when all writes are finished
397//! instead of `advance/advance_append`.
398//! 4. Only persisted entries can be committed and applied, so to make progress, all writes should
399//! be persisted at some point.
400//!
401//! ## Arbitrary Membership Changes
402//!
403//! When building a resilient, scalable distributed system there is a strong need to be able to
404//! change the membership of a peer group *dynamically, without downtime.* This Raft crate supports
405//! this via **Joint Consensus**
406//! ([Raft paper, section 6](https://web.stanford.edu/~ouster/cgi-bin/papers/raft-atc14)).
407//!
408//! It permits resilient arbitrary dynamic membership changes. A membership change can do any or all
409//! of the following:
410//!
411//! * Add peer (learner or voter) *n* to the group.
412//! * Remove a learner *n* from the group.
413//! * Promote a learner to a voter.
414//! * Demote a voter back to learner.
415//! * Replace a node *n* with another node *m*.
416//!
417//! For example to promote a learner 4 and demote an existing voter 3:
418//! ```no_run
419//! # use mephisto_raft::{Config, eraftpb::*, proto, raw_node::RawNode, storage::MemStorage};
420//! #
421//! # let mut config = Config { id: 1, ..Default::default() };
422//! # let store = MemStorage::new_with_conf_state((vec![1, 2], vec![]));
423//! # let mut node = RawNode::new(&mut config, store).unwrap();
424//! let steps = vec![
425//! proto::new_conf_change_single(4, ConfChangeType::AddNode),
426//! proto::new_conf_change_single(3, ConfChangeType::RemoveNode),
427//! ];
428//! let mut cc = ConfChange {
429//! changes: steps,
430//! ..Default::default()
431//! };
432//! node.propose_conf_change(vec![], cc).unwrap();
433//! // After the log is committed and applied
434//! // node.apply_conf_change(&cc).unwrap();
435//! ```
436//!
437//! This process is a two-phase process, during the midst of it the peer group's leader is managing
438//! **two independent, possibly overlapping peer sets**.
439//!
440//! > **Note:** In order to maintain resiliency guarantees (progress while a majority of both peer
441//! > sets is
442//! active), it is recommended to wait until the entire peer group has exited the transition phase
443//! before taking old, removed peers offline.
444
445#![feature(panic_update_hook)]
446#![deny(clippy::all)]
447#![deny(missing_docs)]
448
449pub use confchange::{Changer, MapChange};
450pub use config::Config;
451pub use errors::{Error, Result, StorageError};
452pub use log_unstable::Unstable;
453pub use proto::eraftpb;
454pub use quorum::{joint::Configuration as JointConfig, majority::Configuration as MajorityConfig};
455pub use raft_log::{RaftLog, NO_LIMIT};
456pub use raw_node::{LightReady, Peer, RawNode, Ready, SnapshotStatus};
457pub use read_only::{ReadOnlyOption, ReadState};
458pub use status::Status;
459pub use storage::{GetEntriesContext, RaftState, Storage};
460pub use tracker::{Inflights, Progress, ProgressState, ProgressTracker};
461pub use util::majority;
462
463pub use crate::raft::{
464 vote_resp_msg_type, Raft, SoftState, StateRole, CAMPAIGN_ELECTION, CAMPAIGN_PRE_ELECTION,
465 CAMPAIGN_TRANSFER, INVALID_ID, INVALID_INDEX,
466};
467
468mod confchange;
469mod config;
470mod errors;
471mod log_unstable;
472pub mod proto;
473mod quorum;
474#[cfg(test)]
475pub mod raft;
476#[cfg(not(test))]
477mod raft;
478mod raft_log;
479pub mod raw_node;
480mod read_only;
481mod status;
482pub mod storage;
483mod tracker;
484pub mod util;
485
486pub mod prelude {
487 //! A "prelude" for crates using the `raft` crate.
488 //!
489 //! This prelude is similar to the standard library's prelude in that you'll
490 //! almost always want to import its entire contents, but unlike the standard
491 //! library's prelude you'll have to do so manually:
492 //!
493 //! ```
494 //! use mephisto_raft::prelude::*;
495 //! ```
496 //!
497 //! The prelude may grow over time as additional items see ubiquitous use.
498
499 pub use crate::{
500 config::Config,
501 proto::eraftpb::{
502 ConfChange, ConfChangeSingle, ConfChangeTransition, ConfChangeType, ConfState, Entry,
503 EntryType, HardState, Message, MessageType, Snapshot, SnapshotMetadata,
504 },
505 raft::Raft,
506 raw_node::{Peer, RawNode, Ready, SnapshotStatus},
507 read_only::{ReadOnlyOption, ReadState},
508 status::Status,
509 storage::{RaftState, Storage},
510 Progress,
511 };
512}
513
514type DefaultHashBuilder = std::hash::BuildHasherDefault<fxhash::FxHasher>;
515type HashMap<K, V> = std::collections::HashMap<K, V, DefaultHashBuilder>;
516type HashSet<K> = std::collections::HashSet<K, DefaultHashBuilder>;
517
518/// Logging the panic info before panicking.
519#[macro_export]
520macro_rules! fatal {
521 ($($arg:tt)*) => {{
522 let prev = std::panic::take_hook();
523 std::panic::set_hook(Box::new(move |info| {
524 tracing::error!("{info}");
525 prev(info);
526 }));
527 panic!($($arg)*);
528 }};
529}