1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
//! All public facing message types.
//!
//! For users of this Raft implementation, this module defines the data types of this crate's API.
//! The `RaftNetwork` trait is based entirely off of these messages, and communication with the
//! `Raft` actor is based entirely off of these messages and the messages in the `admin` module.

use actix::prelude::*;
use serde::{Serialize, Deserialize};

use crate::{AppData, AppDataResponse, AppError, NodeId};

//////////////////////////////////////////////////////////////////////////////////////////////////
// AppendEntriesRequest //////////////////////////////////////////////////////////////////////////

/// An RPC invoked by the leader to replicate log entries (§5.3); also used as heartbeat (§5.2).
///
/// ### actix::Message
/// Applications using this Raft implementation are responsible for implementing the
/// networking/transport layer which must move RPCs between nodes. Once the application instance
/// recieves a Raft RPC, it must send the RPC to the Raft node via its `actix::Addr` and then
/// return the response to the original sender.
///
/// The result type of calling the Raft actor with this message type is
/// `Result<AppendEntriesResponse, ()>`. The Raft spec assigns no significance to failures during
/// the handling or sending of RPCs and all RPCs are handled in an idempotent fashion, so Raft
/// will almost always retry sending a failed RPC, depending on the state of the Raft.
#[derive(Debug, Serialize, Deserialize)]
pub struct AppendEntriesRequest<D: AppData> {
    /// A non-standard field, this is the ID of the intended recipient of this RPC.
    pub target: u64,
    /// The leader's current term.
    pub term: u64,
    /// The leader's ID. Useful in redirecting clients.
    pub leader_id: u64,
    /// The index of the log entry immediately preceding the new entries.
    pub prev_log_index: u64,
    /// The term of the `prev_log_index` entry.
    pub prev_log_term: u64,
    /// The new log entries to store.
    ///
    /// This may be empty when the leader is sending heartbeats. Entries
    /// may be batched for efficiency.
    #[serde(bound="D: AppData")]
    pub entries: Vec<Entry<D>>,
    /// The leader's commit index.
    pub leader_commit: u64,
}

impl<D: AppData> Message for AppendEntriesRequest<D> {
    /// The result type of this message.
    ///
    /// The `Result::Err` type is `()` as Raft assigns no significance to RPC failures, they will
    /// be retried almost always as long as permitted by the current state of the Raft.
    type Result = Result<AppendEntriesResponse, ()>;
}

/// An RPC response to an `AppendEntriesRequest` message.
#[derive(Debug, Serialize, Deserialize)]
pub struct AppendEntriesResponse {
    /// The responding node's current term, for leader to update itself.
    pub term: u64,
    /// Will be true if follower contained entry matching `prev_log_index` and `prev_log_term`.
    pub success: bool,
    /// A value used to implement the _conflicting term_ optimization outlined in §5.3.
    ///
    /// This value will only be present, and should only be considered, when `success` is `false`.
    pub conflict_opt: Option<ConflictOpt>,
}

/// A struct used to implement the _conflicting term_ optimization outlined in §5.3 for log replication.
///
/// This value will only be present, and should only be considered, when an `AppendEntriesResponse`
/// object has a `success` value of `false`.
///
/// This implementation of Raft uses this value to more quickly synchronize a leader with its
/// followers which may be some distance behind in replication, may have conflicting entries, or
/// which may be new to the cluster.
#[derive(Debug, Serialize, Deserialize)]
pub struct ConflictOpt {
    /// The term of the most recent entry which does not conflict with the received request.
    pub term: u64,
    /// The index of the most recent entry which does not conflict with the received request.
    pub index: u64,
}

/// A Raft log entry.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Entry<D: AppData> {
    /// This entry's term.
    pub term: u64,
    /// This entry's index.
    pub index: u64,
    /// This entry's payload.
    #[serde(bound="D: AppData")]
    pub payload: EntryPayload<D>,
}

impl<D: AppData> Entry<D> {
    /// Create a new snapshot pointer from the given data.
    pub fn new_snapshot_pointer(pointer: EntrySnapshotPointer, index: u64, term: u64) -> Self {
        Entry{term, index, payload: EntryPayload::SnapshotPointer(pointer)}
    }
}

/// Log entry payload variants.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum EntryPayload<D: AppData> {
    /// An empty payload committed by a new cluster leader.
    Blank,
    /// A normal log entry.
    #[serde(bound="D: AppData")]
    Normal(EntryNormal<D>),
    /// A config change log entry.
    ConfigChange(EntryConfigChange),
    /// An entry which points to a snapshot.
    SnapshotPointer(EntrySnapshotPointer),
}

/// A normal log entry.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct EntryNormal<D: AppData> {
    /// The contents of this entry.
    #[serde(bound="D: AppData")]
    pub data: D,
}

/// A log entry holding a config change.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct EntryConfigChange {
    /// The full list of node IDs to be considered cluster members as part of this config change.
    pub membership: MembershipConfig,
}

/// A log entry pointing to a snapshot.
///
/// This will only be present when read from storage. An entry of this type will never be
/// transmitted from a leader during replication, an `InstallSnapshotRequest`
/// RPC will be sent instead.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct EntrySnapshotPointer {
    /// The location of the snapshot file on disk.
    pub path: String,
}

//////////////////////////////////////////////////////////////////////////////////////////////////
// MembershipConfig //////////////////////////////////////////////////////////////////////////////

/// A model of the membership configuration of the cluster.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct MembershipConfig {
    /// A flag indicating if the system is currently in a joint consensus state.
    pub is_in_joint_consensus: bool,
    /// Voting members of the Raft cluster.
    pub members: Vec<NodeId>,
    /// Non-voting members of the cluster.
    ///
    /// These nodes are being brought up-to-speed by the leader and will be transitioned over to
    /// being standard members once they are up-to-date.
    pub non_voters: Vec<NodeId>,
    /// The set of nodes which are to be removed after joint consensus is complete.
    pub removing: Vec<NodeId>,
}

impl MembershipConfig {
    /// Check if the given NodeId exists in this membership config.
    ///
    /// This checks only the contents of `members` & `non_voters`.
    pub fn contains(&self, x: &NodeId) -> bool {
        self.members.contains(x) || self.non_voters.contains(x)
    }

    /// Get an iterator over all nodes in the current config.
    pub fn all_nodes(&self) -> impl Iterator<Item=&NodeId> {
        self.members.iter().chain(self.non_voters.iter())
    }

    /// Get the length of the members & non_voters vectors.
    pub fn len(&self) -> usize {
        self.members.len() + self.non_voters.len()
    }
}

//////////////////////////////////////////////////////////////////////////////////////////////////
// VoteRequest ///////////////////////////////////////////////////////////////////////////////////

/// An RPC invoked by candidates to gather votes (§5.2).
///
/// ### actix::Message
/// Applications using this Raft implementation are responsible for implementing the
/// networking/transport layer which must move RPCs between nodes. Once the application instance
/// recieves a Raft RPC, it must send the RPC to the Raft node via its `actix::Addr` and then
/// return the response to the original sender.
///
/// The result type of calling the Raft actor with this message type is `Result<VoteResponse, ()>`.
/// The Raft spec assigns no significance to failures during the handling or sending of RPCs and
/// all RPCs are handled in an idempotent fashion, so Raft will almost always retry sending a
/// failed RPC, depending on the state of the Raft.
#[derive(Debug, Serialize, Deserialize)]
pub struct VoteRequest {
    /// A non-standard field, this is the ID of the intended recipient of this RPC.
    pub target: u64,
    /// The candidate's current term.
    pub term: u64,
    /// The candidate's ID.
    pub candidate_id: u64,
    /// The index of the candidate’s last log entry (§5.4).
    pub last_log_index: u64,
    /// The term of the candidate’s last log entry (§5.4).
    pub last_log_term: u64,
}

impl Message for VoteRequest {
    /// The result type of this message.
    ///
    /// The `Result::Err` type is `()` as Raft assigns no significance to RPC failures, they will
    /// be retried almost always as long as permitted by the current state of the Raft.
    type Result = Result<VoteResponse, ()>;
}

impl VoteRequest {
    /// Create a new instance.
    pub fn new(target: u64, term: u64, candidate_id: u64, last_log_index: u64, last_log_term: u64) -> Self {
        Self{target, term, candidate_id, last_log_index, last_log_term}
    }
}

/// An RPC response to an `VoteResponse` message.
#[derive(Debug, Serialize, Deserialize)]
pub struct VoteResponse {
    /// The current term of the responding node, for the candidate to update itself.
    pub term: u64,
    /// Will be true if the candidate received a vote from the responder.
    pub vote_granted: bool,
    /// Will be true if the candidate is unknown to the responding node's config.
    ///
    /// If this field is true, and the sender's (the candidate's) index is greater than 0, then it
    /// should revert to the NonVoter state; if the sender's index is 0, then resume campaigning.
    pub is_candidate_unknown: bool,
}

//////////////////////////////////////////////////////////////////////////////////////////////////
// InstallSnapshotRequest ////////////////////////////////////////////////////////////////////////

/// Invoked by the Raft leader to send chunks of a snapshot to a follower (§7).
///
/// ### actix::Message
/// Applications using this Raft implementation are responsible for implementing the
/// networking/transport layer which must move RPCs between nodes. Once the application instance
/// recieves a Raft RPC, it must send the RPC to the Raft node via its `actix::Addr` and then
/// return the response to the original sender.
///
/// The result type of calling the Raft actor with this message type is
/// `Result<InstallSnapshotResponse, ()>`. The Raft spec assigns no significance to failures during
/// the handling or sending of RPCs and all RPCs are handled in an idempotent fashion, so Raft will
/// almost always retry sending a failed RPC, depending on the state of the Raft.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct InstallSnapshotRequest {
    /// A non-standard field, this is the ID of the intended recipient of this RPC.
    pub target: u64,
    /// The leader's current term.
    pub term: u64,
    /// The leader's ID. Useful in redirecting clients.
    pub leader_id: u64,
    /// The snapshot replaces all log entries up through and including this index.
    pub last_included_index: u64,
    /// The term of the `last_included_index`.
    pub last_included_term: u64,
    /// The byte offset where chunk is positioned in the snapshot file.
    pub offset: u64,
    /// The raw Vec<u8> of the snapshot chunk, starting at `offset`.
    pub data: Vec<u8>,
    /// Will be `true` if this is the last chunk in the snapshot.
    pub done: bool,
}

impl Message for InstallSnapshotRequest {
    /// The result type of this message.
    ///
    /// The `Result::Err` type is `()` as Raft assigns no significance to RPC failures, they will
    /// almost always be retried as long as permitted by the current state of the Raft.
    type Result = Result<InstallSnapshotResponse, ()>;
}

/// An RPC response to an `InstallSnapshotResponse` message.
#[derive(Debug, Serialize, Deserialize)]
pub struct InstallSnapshotResponse {
    /// The receiving node's current term, for leader to update itself.
    pub term: u64,
}

//////////////////////////////////////////////////////////////////////////////////////////////////
// ClientPayload /////////////////////////////////////////////////////////////////////////////////

/// A payload with an entry coming from a client request.
///
/// The entries of this payload will be appended to the Raft log and then applied to the Raft state
/// machine according to the Raft protocol.
///
/// ### actix::Message
/// Applications using this Raft implementation are responsible for implementing the
/// networking/transport layer which must move RPCs between nodes. Once the application instance
/// recieves a Raft RPC, it must send the RPC to the Raft node via its `actix::Addr` and then
/// return the response to the original sender.
///
/// The result type of calling the Raft actor with this message type is
/// `Result<ClientPayloadResponse, StorageError>`. Applications built around this implementation of
/// Raft will often need to perform their own custom logic in the storage layer and often times it
/// is critical to be able to surface such errors to the application and its clients. To meet that
/// end, `ClientError` allows for the communication of application specific errors.
#[derive(Debug, Serialize, Deserialize)]
pub struct ClientPayload<D: AppData, R: AppDataResponse, E: AppError> {
    /// The application specific contents of this client request.
    #[serde(bound="D: AppData")]
    pub(crate) entry: EntryPayload<D>,
    /// The response mode needed by this request.
    pub(crate) response_mode: ResponseMode,
    #[serde(skip)]
    marker0: std::marker::PhantomData<R>,
    #[serde(skip)]
    marker1: std::marker::PhantomData<E>,
}

impl<D: AppData, R: AppDataResponse, E: AppError> ClientPayload<D, R, E> {
    /// Create a new client payload instance with a normal entry type.
    pub fn new(entry: EntryNormal<D>, response_mode: ResponseMode) -> Self {
        Self::new_base(EntryPayload::Normal(entry), response_mode)
    }

    /// Create a new instance.
    pub(crate) fn new_base(entry: EntryPayload<D>, response_mode: ResponseMode) -> Self {
        Self{entry, response_mode, marker0: std::marker::PhantomData, marker1: std::marker::PhantomData}
    }

    /// Generate a new payload holding a config change.
    pub(crate) fn new_config(membership: MembershipConfig) -> Self {
        Self::new_base(EntryPayload::ConfigChange(EntryConfigChange{membership}), ResponseMode::Committed)
    }

    /// Generate a new blank payload.
    ///
    /// This is used by new leaders when first coming to power.
    pub(crate) fn new_blank_payload() -> Self {
        Self::new_base(EntryPayload::Blank, ResponseMode::Committed)
    }
}

impl<D: AppData, R: AppDataResponse, E: AppError> Message for ClientPayload<D, R, E> {
    /// The result type of this message.
    type Result = Result<ClientPayloadResponse<R>, ClientError<D, R, E>>;
}

/// The desired response mode for a client request.
///
/// This value specifies when a client request desires to receive its response from Raft. When
/// `Comitted` is chosen, the client request will receive a response after the request has been
/// successfully replicated to at least half of the nodes in the cluster. This is what the Raft
/// protocol refers to as being comitted.
///
/// When `Applied` is chosen, the client request will receive a response after the request has
/// been successfully committed and successfully applied to the state machine.
///
/// The choice between these two options depends on the requirements related to the request. If
/// the data of the client request payload will need to be read immediately after the response is
/// received, then `Applied` must be used. If there is no requirement that the data must be
/// immediately read after receiving a response, then `Committed` may be used to speed up
/// response times.
#[derive(Debug, Serialize, Deserialize)]
pub enum ResponseMode {
    /// A response will be returned after the request has been committed to the cluster.
    Committed,
    /// A response will be returned after the request has been applied to the leader's state machine.
    Applied,
}

/// A response to a client payload proposed to the Raft system.
#[derive(Debug, Serialize, Deserialize)]
pub enum ClientPayloadResponse<R: AppDataResponse> {
    /// A client response issued just after the request was committed to the cluster.
    Committed {
        /// The log index of the successfully processed client request.
        index: u64,
    },
    Applied {
        /// The log index of the successfully processed client request.
        index: u64,
        /// Application specific response data.
        #[serde(bound="R: AppDataResponse")]
        data: R,
    },
}

impl<R: AppDataResponse> ClientPayloadResponse<R> {
    /// The index of the log entry corresponding to this response object.
    pub fn index(&self) -> u64 {
        match self {
            Self::Committed{index} => *index,
            Self::Applied{index, ..} => *index,
        }
    }
}

/// Error variants which may arise while handling client requests.
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag="type")]
pub enum ClientError<D: AppData, R: AppDataResponse, E: AppError> {
    /// Some error which has taken place internally in Raft.
    Internal,
    /// An application specific error.
    #[serde(bound="E: AppError")]
    Application(E),
    /// The Raft node returning this error is not the Raft leader.
    ///
    /// Forward the payload to the specified leader. If the leader is unknown, it is up to the
    /// application to determine how to handle. The payload can be buffered in the app until the
    /// new leader is known, or it can be returned to the client as an error and the client can be
    /// instructed to send to a new random node until the leader is known.
    ///
    /// The process of electing a new leader is usually a very fast process in Raft, so buffering
    /// the client payload until the new leader is known should not cause a lot of overhead.
    #[serde(bound="D: AppData, R: AppDataResponse, E: AppError")]
    ForwardToLeader {
        /// The original payload which this error is associated with.
        payload: ClientPayload<D, R, E>,
        /// The ID of the current Raft leader, if known.
        leader: Option<NodeId>,
    },
}

impl<D: AppData, R: AppDataResponse, E: AppError> std::fmt::Display for ClientError<D, R, E> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            ClientError::Internal => write!(f, "An internal error was encountered in Raft."),
            ClientError::Application(err) => write!(f, "{}", &err),
            ClientError::ForwardToLeader{..} => write!(f, "The client payload must be forwarded to the Raft leader for processing."),
        }
    }
}

impl<D: AppData, R: AppDataResponse, E: AppError> std::error::Error for ClientError<D, R, E> {}