jopemachine_raft/lib.rs
1// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3// Copyright 2015 The etcd Authors
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17/*!
18
19## Creating a Raft node
20
21You can use [`RawNode::new`] to create the Raft node. To create the Raft node, you need to
22provide a [`Storage`] component, and a [`Config`] to the [`RawNode::new`] function.
23
24```rust
25use raft::{
26 logger::Slogger,
27 Config,
28 storage::MemStorage,
29 raw_node::RawNode,
30};
31use std::sync::Arc;
32use slog::{Drain, o};
33
34// Select some defaults, then change what we need.
35let config = Config {
36 id: 1,
37 ..Default::default()
38};
39// Initialize logger.
40let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
41let logger = Arc::new(Slogger { slog: logger.clone() });
42// ... Make any configuration changes.
43// After, make sure it's valid!
44config.validate().unwrap();
45// We'll use the built-in `MemStorage`, but you will likely want your own.
46// Finally, create our Raft node!
47let storage = MemStorage::new_with_conf_state((vec![1], vec![]));
48let mut node = RawNode::new(&config, storage, logger).unwrap();
49```
50
51## Ticking the Raft node
52
53Use a timer to tick the Raft node at regular intervals. See the following example using Rust
54channel `recv_timeout` to drive the Raft node at least every 100ms, calling
55[`tick()`](RawNode::tick) each time.
56
57```rust
58# use slog::{Drain, o};
59# use std::sync::Arc;
60# use raft::{logger::Slogger, Config, storage::MemStorage, raw_node::RawNode};
61# let config = Config { id: 1, ..Default::default() };
62# let store = MemStorage::new_with_conf_state((vec![1], vec![]));
63# let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
64# let logger = Arc::new(Slogger { slog: logger.clone() });
65# let mut node = RawNode::new(&config, store, logger).unwrap();
66# node.raft.become_candidate();
67# node.raft.become_leader();
68use std::{sync::mpsc::{channel, RecvTimeoutError}, time::{Instant, Duration}};
69
70// We're using a channel, but this could be any stream of events.
71let (tx, rx) = channel();
72let timeout = Duration::from_millis(100);
73let mut remaining_timeout = timeout;
74
75// Send the `tx` somewhere else...
76
77loop {
78 let now = Instant::now();
79
80 match rx.recv_timeout(remaining_timeout) {
81 Ok(()) => {
82 // Let's save this for later.
83 unimplemented!()
84 },
85 Err(RecvTimeoutError::Timeout) => (),
86 Err(RecvTimeoutError::Disconnected) => unimplemented!(),
87 }
88
89 let elapsed = now.elapsed();
90 if elapsed >= remaining_timeout {
91 remaining_timeout = timeout;
92 // We drive Raft every 100ms.
93 node.tick();
94 } else {
95 remaining_timeout -= elapsed;
96 }
97# break;
98}
99```
100
101## Proposing to, and stepping the Raft node
102
103Using the `propose` function you can drive the Raft node when the client sends a request to the
104Raft server. You can call `propose` to add the request to the Raft log explicitly.
105
106In most cases, the client needs to wait for a response for the request. For example, if the
107client writes a value to a key and wants to know whether the write succeeds or not, but the
108write flow is asynchronous in Raft, so the write log entry must be replicated to other followers,
109then committed and at last applied to the state machine, so here we need a way to notify the client
110after the write is finished.
111
112One simple way is to use a unique ID for the client request, and save the associated callback
113function in a hash map. When the log entry is applied, we can get the ID from the decoded entry,
114call the corresponding callback, and notify the client.
115
116You can call the `step` function when you receive the Raft messages from other nodes.
117
118Here is a simple example to use `propose` and `step`:
119
120```rust
121# use raft::{logger::Slogger, Config, storage::MemStorage, raw_node::RawNode, eraftpb::Message};
122# use std::{
123# sync::Arc,
124# sync::mpsc::{channel, RecvTimeoutError},
125# time::{Instant, Duration},
126# collections::HashMap
127# };
128# use slog::{Drain, o};
129#
130# let config = Config { id: 1, ..Default::default() };
131# let store = MemStorage::new_with_conf_state((vec![1], vec![]));
132# let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
133# let logger = Arc::new(Slogger { slog: logger.clone() });
134# let mut node = RawNode::new(&config, store, logger).unwrap();
135# node.raft.become_candidate();
136# node.raft.become_leader();
137#
138# let (tx, rx) = channel();
139# let timeout = Duration::from_millis(100);
140# let mut remaining_timeout = timeout;
141#
142enum Msg {
143 Propose {
144 id: u8,
145 callback: Box<dyn Fn() + Send>,
146 },
147 Raft(Message),
148}
149
150// Simulate a message coming down the stream.
151tx.send(Msg::Propose { id: 1, callback: Box::new(|| ()) });
152
153let mut cbs = HashMap::new();
154loop {
155 let now = Instant::now();
156
157 match rx.recv_timeout(remaining_timeout) {
158 Ok(Msg::Propose { id, callback }) => {
159 cbs.insert(id, callback);
160 node.propose(vec![], vec![id]).unwrap();
161 }
162 Ok(Msg::Raft(m)) => node.step(m).unwrap(),
163 Err(RecvTimeoutError::Timeout) => (),
164 Err(RecvTimeoutError::Disconnected) => unimplemented!(),
165 }
166
167 let elapsed = now.elapsed();
168 if elapsed >= remaining_timeout {
169 remaining_timeout = timeout;
170 // We drive Raft every 100ms.
171 node.tick();
172 } else {
173 remaining_timeout -= elapsed;
174 }
175 break;
176}
177```
178
179In the above example, we use a channel to receive the `propose` and `step` messages. We only
180propose the request ID to the Raft log. In your own practice, you can embed the ID in your request
181and propose the encoded binary request data.
182
183## Processing the `Ready` State
184
185When your Raft node is ticked and running, Raft should enter a `Ready` state. You need to first use
186`has_ready` to check whether Raft is ready. If yes, use the `ready` function to get a `Ready`
187state:
188
189```rust
190# use slog::{Drain, o};
191# use std::sync::Arc;
192# use raft::{logger::Slogger, Config, storage::MemStorage, raw_node::RawNode};
193#
194# let config = Config { id: 1, ..Default::default() };
195# config.validate().unwrap();
196# let store = MemStorage::new_with_conf_state((vec![1], vec![]));
197# let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
198# let logger = Arc::new(Slogger { slog: logger.clone() });
199# let mut node = RawNode::new(&config, store, logger).unwrap();
200#
201if !node.has_ready() {
202 return;
203}
204
205// The Raft is ready, we can do something now.
206let mut ready = node.ready();
207```
208
209The `Ready` state contains quite a bit of information, and you need to check and process them one
210by one:
211
2121. Check whether `messages` is empty or not. If not, it means that the node will send messages to
213other nodes:
214
215 ```rust
216 # use slog::{Drain, o};
217 # use std::sync::Arc;
218 # use raft::{logger::Slogger, Config, storage::MemStorage, raw_node::RawNode, StateRole};
219 #
220 # let config = Config { id: 1, ..Default::default() };
221 # config.validate().unwrap();
222 # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
223 # let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
224 # let logger = Arc::new(Slogger { slog: logger.clone() });
225 # let mut node = RawNode::new(&config, store, logger).unwrap();
226 #
227 # if !node.has_ready() {
228 # return;
229 # }
230 # let mut ready = node.ready();
231 #
232 if !ready.messages().is_empty() {
233 for msg in ready.take_messages() {
234 // Send messages to other peers.
235 }
236 }
237 ```
238
2392. Check whether `snapshot` is empty or not. If not empty, it means that the Raft node has received
240a Raft snapshot from the leader and we must apply the snapshot:
241
242 ```rust
243 # use slog::{Drain, o};
244 # use std::sync::Arc;
245 # use raft::{logger::Slogger, Config, storage::MemStorage, raw_node::RawNode};
246 #
247 # let config = Config { id: 1, ..Default::default() };
248 # config.validate().unwrap();
249 # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
250 # let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
251 # let logger = Arc::new(Slogger { slog: logger.clone() });
252 # let mut node = RawNode::new(&config, store, logger).unwrap();
253 #
254 # if !node.has_ready() {
255 # return;
256 # }
257 # let mut ready = node.ready();
258 #
259 if !ready.snapshot().is_empty() {
260 // This is a snapshot, we need to apply the snapshot at first.
261 node.mut_store()
262 .wl()
263 .apply_snapshot(ready.snapshot().clone())
264 .unwrap();
265 }
266
267 ```
268
2693. Check whether `committed_entries` is empty or not. If not, it means that there are some newly
270committed log entries which you must apply to the state machine. Of course, after applying, you
271need to update the applied index and resume `apply` later:
272
273 ```rust
274 # use slog::{Drain, o};
275 # use std::sync::Arc;
276 # use raft::{logger::Slogger, Config, storage::MemStorage, raw_node::RawNode, eraftpb::EntryType};
277 #
278 # let config = Config { id: 1, ..Default::default() };
279 # config.validate().unwrap();
280 # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
281 # let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
282 # let logger = Arc::new(Slogger { slog: logger.clone() });
283 # let mut node = RawNode::new(&config, store, logger).unwrap();
284 #
285 # if !node.has_ready() {
286 # return;
287 # }
288 # let mut ready = node.ready();
289 #
290 # fn handle_conf_change(e: raft::eraftpb::Entry) {
291 # }
292 #
293 # fn handle_conf_change_v2(e: raft::eraftpb::Entry) {
294 # }
295 #
296 # fn handle_normal(e: raft::eraftpb::Entry) {
297 # }
298 #
299 let mut _last_apply_index = 0;
300 for entry in ready.take_committed_entries() {
301 // Mostly, you need to save the last apply index to resume applying
302 // after restart. Here we just ignore this because we use a Memory storage.
303 _last_apply_index = entry.index;
304
305 if entry.data.is_empty() {
306 // Emtpy entry, when the peer becomes Leader it will send an empty entry.
307 continue;
308 }
309
310 match entry.get_entry_type() {
311 EntryType::EntryNormal => handle_normal(entry),
312 // It's recommended to always use `EntryType::EntryConfChangeV2.
313 EntryType::EntryConfChange => handle_conf_change(entry),
314 EntryType::EntryConfChangeV2 => handle_conf_change_v2(entry),
315 }
316 }
317 ```
318
319 Note, although Raft guarentees only persisted committed entries will be applied,
320 but it doesn't guarentee commit index is persisted before being applied. For example,
321 if application is restarted after applying committed entries before persisting
322 commit index, apply index can be larger than commit index and cause panic. To
323 solve the problem, persisting commit index with or before applying entries.
324 You can also always assign commit index to the `max(commit_index, applied_index)`
325 after restarting, *it may work but potential log loss may also be ignored silently*.
326
3274. Check whether `entries` is empty or not. If not empty, it means that there are newly added
328entries but have not been committed yet, we must append the entries to the Raft log:
329
330 ```rust
331 # use slog::{Drain, o};
332 # use std::sync::Arc;
333 # use raft::{logger::Slogger, Config, storage::MemStorage, raw_node::RawNode};
334 #
335 # let config = Config { id: 1, ..Default::default() };
336 # config.validate().unwrap();
337 # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
338 # let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
339 # let logger = Arc::new(Slogger { slog: logger.clone() });
340 # let mut node = RawNode::new(&config, store, logger).unwrap();
341 #
342 # if !node.has_ready() {
343 # return;
344 # }
345 # let mut ready = node.ready();
346 #
347 if !ready.entries().is_empty() {
348 // Append entries to the Raft log
349 node.mut_store().wl().append(ready.entries()).unwrap();
350 }
351
352 ```
353
3545. Check whether `hs` is empty or not. If not empty, it means that the `HardState` of the node has
355changed. For example, the node may vote for a new leader, or the commit index has been increased.
356We must persist the changed `HardState`:
357
358 ```rust
359 # use slog::{Drain, o};
360 # use std::sync::Arc;
361 # use raft::{logger::Slogger, Config, storage::MemStorage, raw_node::RawNode};
362 #
363 # let config = Config { id: 1, ..Default::default() };
364 # config.validate().unwrap();
365 # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
366 # let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
367 # let logger = Arc::new(Slogger { slog: logger.clone() });
368 # let mut node = RawNode::new(&config, store, logger).unwrap();
369 #
370 # if !node.has_ready() {
371 # return;
372 # }
373 # let mut ready = node.ready();
374 #
375 if let Some(hs) = ready.hs() {
376 // Raft HardState changed, and we need to persist it.
377 node.mut_store().wl().set_hardstate(hs.clone());
378 }
379 ```
380
3816. Check whether `persisted_messages` is empty or not. If not, it means that the node will send messages to
382other nodes after persisting hardstate, entries and snapshot:
383
384 ```rust
385 # use slog::{Drain, o};
386 # use std::sync::Arc;
387 # use raft::{logger::Slogger, Config, storage::MemStorage, raw_node::RawNode, StateRole};
388 #
389 # let config = Config { id: 1, ..Default::default() };
390 # config.validate().unwrap();
391 # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
392 # let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
393 # let logger = Arc::new(Slogger { slog: logger.clone() });
394 # let mut node = RawNode::new(&config, store, logger).unwrap();
395 #
396 # if !node.has_ready() {
397 # return;
398 # }
399 # let mut ready = node.ready();
400 #
401 if !ready.persisted_messages().is_empty() {
402 for msg in ready.take_persisted_messages() {
403 // Send persisted messages to other peers.
404 }
405 }
406 ```
407
4087. Call `advance` to notify that the previous work is completed. Get the return value `LightReady`
409and handle its `messages` and `committed_entries` like step 1 and step 3 does. Then call `advance_apply`
410to advance the applied index inside.
411
412 ```rust
413 # use slog::{Drain, o};
414 # use std::sync::Arc;
415 # use raft::{logger::Slogger, Config, storage::MemStorage, raw_node::RawNode};
416 # use raft::eraftpb::{EntryType, Entry, Message};
417 #
418 # let config = Config { id: 1, ..Default::default() };
419 # config.validate().unwrap();
420 # let store = MemStorage::new_with_conf_state((vec![1], vec![]));
421 # let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
422 # let logger = Arc::new(Slogger { slog: logger.clone() });
423 # let mut node = RawNode::new(&config, store, logger).unwrap();
424 #
425 # if !node.has_ready() {
426 # return;
427 # }
428 # let mut ready = node.ready();
429 #
430 # fn handle_messages(msgs: Vec<Message>) {
431 # }
432 #
433 # fn handle_committed_entries(committed_entries: Vec<Entry>) {
434 # }
435 let mut light_rd = node.advance(ready);
436 // Like step 1 and 3, you can use functions to make them behave the same.
437 handle_messages(light_rd.take_messages());
438 handle_committed_entries(light_rd.take_committed_entries());
439 node.advance_apply();
440 ```
441
442For more information, check out an [example](examples/single_mem_node/main.rs#L113-L179).
443
444Sometimes it's better not to block the raft machine in IO operation, so that latency of
445read/write can be more predictable and the fsync frequency can be controlled. The crate
446supports async ready to offload the IO operation to other thread. The usage is the same as
447above except:
4481. All writes are not required to be persisted immediately, they can be written into memory caches;
4492. Persisted messages should be sent after all corresponding writes are persisted;
4503. [`advance_append_async`](RawNode::advance_append_async) is used when all writes are finished
451 instead of `advance/advance_append`.
4524. Only persisted entries can be committed and applied, so to make progress, all writes should
453 be persisted at some point.
454
455## Arbitrary Membership Changes
456
457When building a resilient, scalable distributed system there is a strong need to be able to change
458the membership of a peer group *dynamically, without downtime.* This Raft crate supports this via
459**Joint Consensus**
460([Raft paper, section 6](https://web.stanford.edu/~ouster/cgi-bin/papers/raft-atc14)).
461
462It permits resilient arbitrary dynamic membership changes. A membership change can do any or all of
463the following:
464
465* Add peer (learner or voter) *n* to the group.
466* Remove a learner *n* from the group.
467* Promote a learner to a voter.
468* Demote a voter back to learner.
469* Replace a node *n* with another node *m*.
470
471For example to promote a learner 4 and demote an existing voter 3:
472```no_run
473# use std::sync::Arc;
474# use raft::{logger::Slogger, Config, storage::MemStorage, raw_node::RawNode, eraftpb::*};
475# use protobuf::Message as PbMessage;
476# use slog::{Drain, o};
477#
478# let mut config = Config { id: 1, ..Default::default() };
479# let store = MemStorage::new_with_conf_state((vec![1, 2], vec![]));
480# let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
481# let logger = Arc::new(Slogger { slog: logger.clone() });
482# let mut node = RawNode::new(&mut config, store, logger).unwrap();
483let steps = vec![
484 raft_proto::new_conf_change_single(4, ConfChangeType::AddNode),
485 raft_proto::new_conf_change_single(3, ConfChangeType::RemoveNode),
486];
487let mut cc = ConfChangeV2::default();
488cc.set_changes(steps.into());
489node.propose_conf_change(vec![], cc).unwrap();
490// After the log is committed and applied
491// node.apply_conf_change(&cc).unwrap();
492```
493
494This process is a two-phase process, during the midst of it the peer group's leader is managing
495**two independent, possibly overlapping peer sets**.
496
497> **Note:** In order to maintain resiliency guarantees (progress while a majority of both peer sets is
498active), it is recommended to wait until the entire peer group has exited the transition phase
499before taking old, removed peers offline.
500
501*/
502
503#![deny(clippy::all)]
504#![deny(missing_docs)]
505#![recursion_limit = "128"]
506// TODO: remove this when we update the mininum rust compatible version.
507#![allow(unused_imports)]
508// This is necessary to support prost and rust-protobuf at the same time.
509#![allow(clippy::useless_conversion)]
510// This lint recommends some bad choices sometimes.
511#![allow(clippy::unnecessary_unwrap)]
512// We use `default` method a lot to be support prost and rust-protobuf at the
513// same time. And reassignment can be optimized by compiler.
514#![allow(clippy::field_reassign_with_default)]
515
516macro_rules! fatal {
517 ($logger:expr, $msg:expr) => {{
518 let owned_kv = ($logger).list();
519 let s = crate::util::format_kv_list(&owned_kv);
520 if s.is_empty() {
521 panic!("{}", $msg)
522 } else {
523 panic!("{}, {}", $msg, s)
524 }
525 }};
526 ($logger:expr, $fmt:expr, $($arg:tt)+) => {{
527 fatal!($logger, format_args!($fmt, $($arg)+))
528 }};
529}
530
531mod confchange;
532mod config;
533mod errors;
534#[allow(missing_docs)]
535pub mod formatter;
536mod log_unstable;
537pub mod logger;
538mod quorum;
539mod raft;
540mod raft_log;
541pub mod raw_node;
542mod read_only;
543mod status;
544pub mod storage;
545mod tracker;
546pub mod util;
547
548pub use crate::raft::{
549 vote_resp_msg_type, Raft, SoftState, StateRole, CAMPAIGN_ELECTION, CAMPAIGN_PRE_ELECTION,
550 CAMPAIGN_TRANSFER, INVALID_ID, INVALID_INDEX,
551};
552pub use confchange::{Changer, MapChange};
553pub use config::Config;
554pub use errors::{Error, Result, StorageError};
555pub use log_unstable::Unstable;
556pub use quorum::joint::Configuration as JointConfig;
557pub use quorum::majority::Configuration as MajorityConfig;
558pub use raft_log::{RaftLog, NO_LIMIT};
559pub use raft_proto::eraftpb;
560#[allow(deprecated)]
561pub use raw_node::is_empty_snap;
562pub use raw_node::{LightReady, Peer, RawNode, Ready, SnapshotStatus};
563pub use read_only::{ReadOnlyOption, ReadState};
564pub use status::Status;
565pub use storage::{GetEntriesContext, RaftState, Storage};
566pub use tracker::{Inflights, Progress, ProgressState, ProgressTracker};
567pub use util::majority;
568
569pub mod prelude {
570 //! A "prelude" for crates using the `raft` crate.
571 //!
572 //! This prelude is similar to the standard library's prelude in that you'll
573 //! almost always want to import its entire contents, but unlike the standard
574 //! library's prelude you'll have to do so manually:
575 //!
576 //! ```
577 //! use raft::prelude::*;
578 //! ```
579 //!
580 //! The prelude may grow over time as additional items see ubiquitous use.
581
582 pub use raft_proto::prelude::*;
583
584 pub use crate::config::Config;
585 pub use crate::formatter;
586 pub use crate::raft::Raft;
587
588 pub use crate::storage::{RaftState, Storage};
589
590 pub use crate::raw_node::{Peer, RawNode, Ready, SnapshotStatus};
591
592 pub use crate::Progress;
593
594 pub use crate::status::Status;
595
596 pub use crate::read_only::{ReadOnlyOption, ReadState};
597}
598
599/// The default logger we fall back to when passed `None` in external facing constructors.
600///
601/// Currently, this is a `log` adaptor behind a `Once` to ensure there is no clobbering.
602#[cfg(any(test, feature = "default-logger"))]
603pub fn default_logger() -> slog::Logger {
604 use slog::{o, Drain};
605 use std::sync::{Mutex, Once};
606
607 static LOGGER_INITIALIZED: Once = Once::new();
608 static mut LOGGER: Option<slog::Logger> = None;
609
610 let logger = unsafe {
611 LOGGER_INITIALIZED.call_once(|| {
612 let decorator = slog_term::TermDecorator::new().build();
613 let drain = slog_term::CompactFormat::new(decorator).build();
614 let drain = slog_envlogger::new(drain);
615 LOGGER = Some(slog::Logger::root(Mutex::new(drain).fuse(), o!()));
616 });
617 LOGGER.as_ref().unwrap()
618 };
619 if let Some(case) = std::thread::current()
620 .name()
621 .and_then(|v| v.split(':').last())
622 {
623 logger.new(o!("case" => case.to_string()))
624 } else {
625 logger.new(o!())
626 }
627}
628
629type DefaultHashBuilder = std::hash::BuildHasherDefault<fxhash::FxHasher>;
630type HashMap<K, V> = std::collections::HashMap<K, V, DefaultHashBuilder>;
631type HashSet<K> = std::collections::HashSet<K, DefaultHashBuilder>;