raft_consensus/
lib.rs

1//! # Raft
2//!
3//! This is a crate containing a Raft consensus protocol implementation and encoding/decoding
4//! helpers. This is a logic-only crate without any networking part.
5//!
6//! To use Raft in it's full strength using this crate, one should do the following:
7//!
8//! * determine and implement(or take ready ones) state machine and persistent log implementations
9//!
10//! * find or make a part responsible for passing peer and client messages over the wire and pass
11//! all these messages from to one of `...Consensus` structures
12//!
13//! * define a ConsensusHandler with callbacks doing the job for passing messages generated by
14//! consensus to other nodes
15
16extern crate byteorder;
17extern crate failure;
18#[macro_use]
19extern crate failure_derive;
20#[macro_use]
21extern crate log;
22#[cfg(test)]
23extern crate pretty_env_logger;
24extern crate uuid;
25
26#[cfg(feature = "use_serde")]
27extern crate serde;
28#[cfg(feature = "use_serde")]
29#[cfg_attr(feature = "use_serde", macro_use)]
30extern crate serde_derive;
31
32#[cfg(feature = "use_capnp")]
33extern crate capnp;
34
35pub mod error;
36pub mod persistent_log;
37pub mod state_machine;
38
39/// Implementation of Raft consensus API
40pub mod consensus;
41
42/// Messages that are passed during consensus work
43#[cfg_attr(feature = "use_capnp", macro_use)]
44pub mod message;
45
46/// Provides consensus state type
47pub mod state;
48
49/// Handlers for consensus callbacks
50pub mod handler;
51
52/// Handle consensus from many threads
53pub mod shared;
54
55#[cfg(feature = "use_capnp")]
56pub mod messages_capnp {
57    //    #![allow(dead_code)]
58    include!(concat!(env!("OUT_DIR"), "/schema/messages_capnp.rs"));
59}
60
61use std::{fmt, ops};
62
63use error::Error;
64use uuid::Uuid;
65
66pub use consensus::Consensus;
67pub use handler::ConsensusHandler;
68pub use persistent_log::Log;
69pub use shared::SharedConsensus;
70pub use state_machine::StateMachine;
71
72#[cfg(feature = "use_capnp")]
73use messages_capnp::entry;
74
75#[cfg(feature = "use_capnp")]
76use capnp::message::{Allocator, Builder, HeapAllocator, Reader, ReaderSegments};
77
78/// The term of a log entry.
79#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
80#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
81pub struct Term(pub u64);
82impl Term {
83    pub fn as_u64(self) -> u64 {
84        self.0
85    }
86}
87
88impl From<u64> for Term {
89    fn from(val: u64) -> Term {
90        Term(val)
91    }
92}
93
94impl Into<u64> for Term {
95    fn into(self) -> u64 {
96        self.0
97    }
98}
99
100impl ops::Add<u64> for Term {
101    type Output = Term;
102    fn add(self, rhs: u64) -> Term {
103        Term(
104            self.0
105                .checked_add(rhs)
106                .expect("overflow while incrementing Term"),
107        )
108    }
109}
110
111impl ops::Sub<u64> for Term {
112    type Output = Term;
113    fn sub(self, rhs: u64) -> Term {
114        Term(
115            self.0
116                .checked_sub(rhs)
117                .expect("underflow while decrementing Term"),
118        )
119    }
120}
121
122impl fmt::Display for Term {
123    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
124        fmt::Display::fmt(&self.0, f)
125    }
126}
127
128/// The index of a log entry.
129#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
130#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
131pub struct LogIndex(pub u64);
132impl LogIndex {
133    pub fn as_u64(self) -> u64 {
134        self.0
135    }
136}
137
138impl From<u64> for LogIndex {
139    fn from(val: u64) -> LogIndex {
140        LogIndex(val)
141    }
142}
143
144impl Into<u64> for LogIndex {
145    fn into(self) -> u64 {
146        self.0
147    }
148}
149
150impl ops::Add<u64> for LogIndex {
151    type Output = LogIndex;
152    fn add(self, rhs: u64) -> LogIndex {
153        LogIndex(
154            self.0
155                .checked_add(rhs)
156                .expect("overflow while incrementing LogIndex"),
157        )
158    }
159}
160
161impl ops::Sub<u64> for LogIndex {
162    type Output = LogIndex;
163    fn sub(self, rhs: u64) -> LogIndex {
164        LogIndex(
165            self.0
166                .checked_sub(rhs)
167                .expect("underflow while decrementing LogIndex"),
168        )
169    }
170}
171
172/// Find the offset between two log indices.
173impl ops::Sub for LogIndex {
174    type Output = u64;
175    fn sub(self, rhs: LogIndex) -> u64 {
176        self.0
177            .checked_sub(rhs.0)
178            .expect("underflow while subtracting LogIndex")
179    }
180}
181
182impl fmt::Display for LogIndex {
183    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
184        fmt::Display::fmt(&self.0, f)
185    }
186}
187
188/// The ID of a Raft server. Must be unique among the participants in a
189/// consensus group.
190#[derive(Copy, Clone, Hash, PartialEq, Eq, Ord, PartialOrd, Debug)]
191#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
192pub struct ServerId(pub u64);
193
194impl ServerId {
195    pub fn as_u64(self) -> u64 {
196        self.0
197    }
198}
199
200impl From<u64> for ServerId {
201    fn from(val: u64) -> ServerId {
202        ServerId(val)
203    }
204}
205
206impl Into<u64> for ServerId {
207    fn into(self) -> u64 {
208        self.0
209    }
210}
211
212impl fmt::Display for ServerId {
213    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
214        fmt::Display::fmt(&self.0, f)
215    }
216}
217
218/// The ID of a Raft client.
219#[derive(Copy, Clone, Hash, PartialEq, Eq, Debug, Default)]
220#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
221pub struct ClientId(pub Uuid);
222impl ClientId {
223    pub fn new() -> ClientId {
224        ClientId(Uuid::new_v4())
225    }
226
227    pub fn as_bytes(&self) -> &[u8] {
228        self.0.as_bytes()
229    }
230
231    pub fn from_bytes(bytes: &[u8]) -> Result<ClientId, Error> {
232        Uuid::from_bytes(bytes).map(ClientId).map_err(Error::Uuid)
233    }
234}
235
236impl fmt::Display for ClientId {
237    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
238        fmt::Display::fmt(&self.0, f)
239    }
240}
241
242/// Type representing a log entry
243#[derive(Debug, Clone, PartialEq)]
244#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
245pub struct Entry {
246    pub term: Term,
247    pub data: Vec<u8>,
248}
249
250impl Entry {
251    pub fn new(term: Term, data: Vec<u8>) -> Self {
252        Self { term, data }
253    }
254}
255
256impl From<Entry> for (Term, Vec<u8>) {
257    fn from(e: Entry) -> (Term, Vec<u8>) {
258        (e.term, e.data)
259    }
260}
261
262#[cfg(feature = "use_capnp")]
263impl Entry {
264    pub fn from_capnp<'a>(reader: entry::Reader<'a>) -> Result<Self, Error> {
265        Ok(Entry {
266            term: reader.get_term().into(),
267            data: reader.get_data().map_err(Error::Capnp)?.to_vec(),
268        })
269    }
270
271    pub fn fill_capnp<'a>(&self, builder: &mut entry::Builder<'a>) {
272        builder.set_term(self.term.as_u64());
273        builder.set_data(&self.data);
274    }
275
276    common_capnp!(entry::Builder, entry::Reader);
277}