1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440
//! All public facing message types. //! //! For users of this Raft implementation, this module defines the data types of this crate's API. //! The `RaftNetwork` trait is based entirely off of these messages, and communication with the //! `Raft` actor is based entirely off of these messages and the messages in the `admin` module. use actix::prelude::*; use serde::{Serialize, Deserialize}; use crate::{AppData, AppDataResponse, AppError, NodeId}; ////////////////////////////////////////////////////////////////////////////////////////////////// // AppendEntriesRequest ////////////////////////////////////////////////////////////////////////// /// An RPC invoked by the leader to replicate log entries (§5.3); also used as heartbeat (§5.2). /// /// ### actix::Message /// Applications using this Raft implementation are responsible for implementing the /// networking/transport layer which must move RPCs between nodes. Once the application instance /// recieves a Raft RPC, it must send the RPC to the Raft node via its `actix::Addr` and then /// return the response to the original sender. /// /// The result type of calling the Raft actor with this message type is /// `Result<AppendEntriesResponse, ()>`. The Raft spec assigns no significance to failures during /// the handling or sending of RPCs and all RPCs are handled in an idempotent fashion, so Raft /// will almost always retry sending a failed RPC, depending on the state of the Raft. #[derive(Debug, Serialize, Deserialize)] pub struct AppendEntriesRequest<D: AppData> { /// A non-standard field, this is the ID of the intended recipient of this RPC. pub target: u64, /// The leader's current term. pub term: u64, /// The leader's ID. Useful in redirecting clients. pub leader_id: u64, /// The index of the log entry immediately preceding the new entries. pub prev_log_index: u64, /// The term of the `prev_log_index` entry. pub prev_log_term: u64, /// The new log entries to store. /// /// This may be empty when the leader is sending heartbeats. Entries /// may be batched for efficiency. #[serde(bound="D: AppData")] pub entries: Vec<Entry<D>>, /// The leader's commit index. pub leader_commit: u64, } impl<D: AppData> Message for AppendEntriesRequest<D> { /// The result type of this message. /// /// The `Result::Err` type is `()` as Raft assigns no significance to RPC failures, they will /// be retried almost always as long as permitted by the current state of the Raft. type Result = Result<AppendEntriesResponse, ()>; } /// An RPC response to an `AppendEntriesRequest` message. #[derive(Debug, Serialize, Deserialize)] pub struct AppendEntriesResponse { /// The responding node's current term, for leader to update itself. pub term: u64, /// Will be true if follower contained entry matching `prev_log_index` and `prev_log_term`. pub success: bool, /// A value used to implement the _conflicting term_ optimization outlined in §5.3. /// /// This value will only be present, and should only be considered, when `success` is `false`. pub conflict_opt: Option<ConflictOpt>, } /// A struct used to implement the _conflicting term_ optimization outlined in §5.3 for log replication. /// /// This value will only be present, and should only be considered, when an `AppendEntriesResponse` /// object has a `success` value of `false`. /// /// This implementation of Raft uses this value to more quickly synchronize a leader with its /// followers which may be some distance behind in replication, may have conflicting entries, or /// which may be new to the cluster. #[derive(Debug, Serialize, Deserialize)] pub struct ConflictOpt { /// The term of the most recent entry which does not conflict with the received request. pub term: u64, /// The index of the most recent entry which does not conflict with the received request. pub index: u64, } /// A Raft log entry. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct Entry<D: AppData> { /// This entry's term. pub term: u64, /// This entry's index. pub index: u64, /// This entry's payload. #[serde(bound="D: AppData")] pub payload: EntryPayload<D>, } impl<D: AppData> Entry<D> { /// Create a new snapshot pointer from the given data. pub fn new_snapshot_pointer(pointer: EntrySnapshotPointer, index: u64, term: u64) -> Self { Entry{term, index, payload: EntryPayload::SnapshotPointer(pointer)} } } /// Log entry payload variants. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum EntryPayload<D: AppData> { /// An empty payload committed by a new cluster leader. Blank, /// A normal log entry. #[serde(bound="D: AppData")] Normal(EntryNormal<D>), /// A config change log entry. ConfigChange(EntryConfigChange), /// An entry which points to a snapshot. SnapshotPointer(EntrySnapshotPointer), } /// A normal log entry. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct EntryNormal<D: AppData> { /// The contents of this entry. #[serde(bound="D: AppData")] pub data: D, } /// A log entry holding a config change. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct EntryConfigChange { /// The full list of node IDs to be considered cluster members as part of this config change. pub membership: MembershipConfig, } /// A log entry pointing to a snapshot. /// /// This will only be present when read from storage. An entry of this type will never be /// transmitted from a leader during replication, an `InstallSnapshotRequest` /// RPC will be sent instead. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct EntrySnapshotPointer { /// The location of the snapshot file on disk. pub path: String, } ////////////////////////////////////////////////////////////////////////////////////////////////// // MembershipConfig ////////////////////////////////////////////////////////////////////////////// /// A model of the membership configuration of the cluster. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct MembershipConfig { /// A flag indicating if the system is currently in a joint consensus state. pub is_in_joint_consensus: bool, /// Voting members of the Raft cluster. pub members: Vec<NodeId>, /// Non-voting members of the cluster. /// /// These nodes are being brought up-to-speed by the leader and will be transitioned over to /// being standard members once they are up-to-date. pub non_voters: Vec<NodeId>, /// The set of nodes which are to be removed after joint consensus is complete. pub removing: Vec<NodeId>, } impl MembershipConfig { /// Check if the given NodeId exists in this membership config. /// /// This checks only the contents of `members` & `non_voters`. pub fn contains(&self, x: &NodeId) -> bool { self.members.contains(x) || self.non_voters.contains(x) } /// Get an iterator over all nodes in the current config. pub fn all_nodes(&self) -> impl Iterator<Item=&NodeId> { self.members.iter().chain(self.non_voters.iter()) } /// Get the length of the members & non_voters vectors. pub fn len(&self) -> usize { self.members.len() + self.non_voters.len() } } ////////////////////////////////////////////////////////////////////////////////////////////////// // VoteRequest /////////////////////////////////////////////////////////////////////////////////// /// An RPC invoked by candidates to gather votes (§5.2). /// /// ### actix::Message /// Applications using this Raft implementation are responsible for implementing the /// networking/transport layer which must move RPCs between nodes. Once the application instance /// recieves a Raft RPC, it must send the RPC to the Raft node via its `actix::Addr` and then /// return the response to the original sender. /// /// The result type of calling the Raft actor with this message type is `Result<VoteResponse, ()>`. /// The Raft spec assigns no significance to failures during the handling or sending of RPCs and /// all RPCs are handled in an idempotent fashion, so Raft will almost always retry sending a /// failed RPC, depending on the state of the Raft. #[derive(Debug, Serialize, Deserialize)] pub struct VoteRequest { /// A non-standard field, this is the ID of the intended recipient of this RPC. pub target: u64, /// The candidate's current term. pub term: u64, /// The candidate's ID. pub candidate_id: u64, /// The index of the candidate’s last log entry (§5.4). pub last_log_index: u64, /// The term of the candidate’s last log entry (§5.4). pub last_log_term: u64, } impl Message for VoteRequest { /// The result type of this message. /// /// The `Result::Err` type is `()` as Raft assigns no significance to RPC failures, they will /// be retried almost always as long as permitted by the current state of the Raft. type Result = Result<VoteResponse, ()>; } impl VoteRequest { /// Create a new instance. pub fn new(target: u64, term: u64, candidate_id: u64, last_log_index: u64, last_log_term: u64) -> Self { Self{target, term, candidate_id, last_log_index, last_log_term} } } /// An RPC response to an `VoteResponse` message. #[derive(Debug, Serialize, Deserialize)] pub struct VoteResponse { /// The current term of the responding node, for the candidate to update itself. pub term: u64, /// Will be true if the candidate received a vote from the responder. pub vote_granted: bool, /// Will be true if the candidate is unknown to the responding node's config. /// /// If this field is true, and the sender's (the candidate's) index is greater than 0, then it /// should revert to the NonVoter state; if the sender's index is 0, then resume campaigning. pub is_candidate_unknown: bool, } ////////////////////////////////////////////////////////////////////////////////////////////////// // InstallSnapshotRequest //////////////////////////////////////////////////////////////////////// /// Invoked by the Raft leader to send chunks of a snapshot to a follower (§7). /// /// ### actix::Message /// Applications using this Raft implementation are responsible for implementing the /// networking/transport layer which must move RPCs between nodes. Once the application instance /// recieves a Raft RPC, it must send the RPC to the Raft node via its `actix::Addr` and then /// return the response to the original sender. /// /// The result type of calling the Raft actor with this message type is /// `Result<InstallSnapshotResponse, ()>`. The Raft spec assigns no significance to failures during /// the handling or sending of RPCs and all RPCs are handled in an idempotent fashion, so Raft will /// almost always retry sending a failed RPC, depending on the state of the Raft. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct InstallSnapshotRequest { /// A non-standard field, this is the ID of the intended recipient of this RPC. pub target: u64, /// The leader's current term. pub term: u64, /// The leader's ID. Useful in redirecting clients. pub leader_id: u64, /// The snapshot replaces all log entries up through and including this index. pub last_included_index: u64, /// The term of the `last_included_index`. pub last_included_term: u64, /// The byte offset where chunk is positioned in the snapshot file. pub offset: u64, /// The raw Vec<u8> of the snapshot chunk, starting at `offset`. pub data: Vec<u8>, /// Will be `true` if this is the last chunk in the snapshot. pub done: bool, } impl Message for InstallSnapshotRequest { /// The result type of this message. /// /// The `Result::Err` type is `()` as Raft assigns no significance to RPC failures, they will /// almost always be retried as long as permitted by the current state of the Raft. type Result = Result<InstallSnapshotResponse, ()>; } /// An RPC response to an `InstallSnapshotResponse` message. #[derive(Debug, Serialize, Deserialize)] pub struct InstallSnapshotResponse { /// The receiving node's current term, for leader to update itself. pub term: u64, } ////////////////////////////////////////////////////////////////////////////////////////////////// // ClientPayload ///////////////////////////////////////////////////////////////////////////////// /// A payload with an entry coming from a client request. /// /// The entries of this payload will be appended to the Raft log and then applied to the Raft state /// machine according to the Raft protocol. /// /// ### actix::Message /// Applications using this Raft implementation are responsible for implementing the /// networking/transport layer which must move RPCs between nodes. Once the application instance /// recieves a Raft RPC, it must send the RPC to the Raft node via its `actix::Addr` and then /// return the response to the original sender. /// /// The result type of calling the Raft actor with this message type is /// `Result<ClientPayloadResponse, StorageError>`. Applications built around this implementation of /// Raft will often need to perform their own custom logic in the storage layer and often times it /// is critical to be able to surface such errors to the application and its clients. To meet that /// end, `ClientError` allows for the communication of application specific errors. #[derive(Debug, Serialize, Deserialize)] pub struct ClientPayload<D: AppData, R: AppDataResponse, E: AppError> { /// The application specific contents of this client request. #[serde(bound="D: AppData")] pub(crate) entry: EntryPayload<D>, /// The response mode needed by this request. pub(crate) response_mode: ResponseMode, #[serde(skip)] marker0: std::marker::PhantomData<R>, #[serde(skip)] marker1: std::marker::PhantomData<E>, } impl<D: AppData, R: AppDataResponse, E: AppError> ClientPayload<D, R, E> { /// Create a new client payload instance with a normal entry type. pub fn new(entry: EntryNormal<D>, response_mode: ResponseMode) -> Self { Self::new_base(EntryPayload::Normal(entry), response_mode) } /// Create a new instance. pub(crate) fn new_base(entry: EntryPayload<D>, response_mode: ResponseMode) -> Self { Self{entry, response_mode, marker0: std::marker::PhantomData, marker1: std::marker::PhantomData} } /// Generate a new payload holding a config change. pub(crate) fn new_config(membership: MembershipConfig) -> Self { Self::new_base(EntryPayload::ConfigChange(EntryConfigChange{membership}), ResponseMode::Committed) } /// Generate a new blank payload. /// /// This is used by new leaders when first coming to power. pub(crate) fn new_blank_payload() -> Self { Self::new_base(EntryPayload::Blank, ResponseMode::Committed) } } impl<D: AppData, R: AppDataResponse, E: AppError> Message for ClientPayload<D, R, E> { /// The result type of this message. type Result = Result<ClientPayloadResponse<R>, ClientError<D, R, E>>; } /// The desired response mode for a client request. /// /// This value specifies when a client request desires to receive its response from Raft. When /// `Comitted` is chosen, the client request will receive a response after the request has been /// successfully replicated to at least half of the nodes in the cluster. This is what the Raft /// protocol refers to as being comitted. /// /// When `Applied` is chosen, the client request will receive a response after the request has /// been successfully committed and successfully applied to the state machine. /// /// The choice between these two options depends on the requirements related to the request. If /// the data of the client request payload will need to be read immediately after the response is /// received, then `Applied` must be used. If there is no requirement that the data must be /// immediately read after receiving a response, then `Committed` may be used to speed up /// response times. #[derive(Debug, Serialize, Deserialize)] pub enum ResponseMode { /// A response will be returned after the request has been committed to the cluster. Committed, /// A response will be returned after the request has been applied to the leader's state machine. Applied, } /// A response to a client payload proposed to the Raft system. #[derive(Debug, Serialize, Deserialize)] pub enum ClientPayloadResponse<R: AppDataResponse> { /// A client response issued just after the request was committed to the cluster. Committed { /// The log index of the successfully processed client request. index: u64, }, Applied { /// The log index of the successfully processed client request. index: u64, /// Application specific response data. #[serde(bound="R: AppDataResponse")] data: R, }, } impl<R: AppDataResponse> ClientPayloadResponse<R> { /// The index of the log entry corresponding to this response object. pub fn index(&self) -> u64 { match self { Self::Committed{index} => *index, Self::Applied{index, ..} => *index, } } } /// Error variants which may arise while handling client requests. #[derive(Debug, Serialize, Deserialize)] #[serde(tag="type")] pub enum ClientError<D: AppData, R: AppDataResponse, E: AppError> { /// Some error which has taken place internally in Raft. Internal, /// An application specific error. #[serde(bound="E: AppError")] Application(E), /// The Raft node returning this error is not the Raft leader. /// /// Forward the payload to the specified leader. If the leader is unknown, it is up to the /// application to determine how to handle. The payload can be buffered in the app until the /// new leader is known, or it can be returned to the client as an error and the client can be /// instructed to send to a new random node until the leader is known. /// /// The process of electing a new leader is usually a very fast process in Raft, so buffering /// the client payload until the new leader is known should not cause a lot of overhead. #[serde(bound="D: AppData, R: AppDataResponse, E: AppError")] ForwardToLeader { /// The original payload which this error is associated with. payload: ClientPayload<D, R, E>, /// The ID of the current Raft leader, if known. leader: Option<NodeId>, }, } impl<D: AppData, R: AppDataResponse, E: AppError> std::fmt::Display for ClientError<D, R, E> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { ClientError::Internal => write!(f, "An internal error was encountered in Raft."), ClientError::Application(err) => write!(f, "{}", &err), ClientError::ForwardToLeader{..} => write!(f, "The client payload must be forwarded to the Raft leader for processing."), } } } impl<D: AppData, R: AppDataResponse, E: AppError> std::error::Error for ClientError<D, R, E> {}