1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
//! The RaftStorage interface and message types. use std::sync::Arc; use actix::{ dev::ToEnvelope, prelude::*, }; use futures::sync::{mpsc::UnboundedReceiver, oneshot::Sender}; use serde::{Serialize, Deserialize}; use crate::{ AppData, AppDataResponse, AppError, NodeId, messages, }; ////////////////////////////////////////////////////////////////////////////// // GetInitialState /////////////////////////////////////////////////////////// /// A request from Raft to get Raft's state information from storage. /// /// When the Raft actor is first started, it will call this interface on the storage system to /// fetch the last known state from stable storage. If no such entry exists due to being the /// first time the node has come online, then the default value for `InitialState` should be used. /// /// ### pro tip /// The storage impl may need to look in a few different places to accurately respond to this /// request. That last entry in the log for `last_log_index` & `last_log_term`; the node's hard /// state record; and the index of the last log applied to the state machine. pub struct GetInitialState<E: AppError> { marker: std::marker::PhantomData<E>, } impl<E: AppError> GetInitialState<E> { // Create a new instance. pub fn new() -> Self { Self{marker: std::marker::PhantomData} } } impl<E: AppError> Message for GetInitialState<E> { type Result = Result<InitialState, E>; } /// A struct used to represent the initial state which a Raft node needs when first starting. #[derive(Clone, Debug)] pub struct InitialState { /// The index of the last entry. pub last_log_index: u64, /// The term of the last log entry. pub last_log_term: u64, /// The index of the last log applied to the state machine. pub last_applied_log: u64, /// The saved hard state of the node. pub hard_state: HardState, } ////////////////////////////////////////////////////////////////////////////////////////////////// // GetLogEntries ///////////////////////////////////////////////////////////////////////////////// /// A request from Raft to get a series of log entries from storage. /// /// The start value is inclusive in the search and the stop value is non-inclusive: /// `[start, stop)`. pub struct GetLogEntries<D: AppData, E: AppError> { pub start: u64, pub stop: u64, marker_data: std::marker::PhantomData<D>, marker_error: std::marker::PhantomData<E>, } impl<D: AppData, E: AppError> GetLogEntries<D, E> { // Create a new instance. pub fn new(start: u64, stop: u64) -> Self { Self{start, stop, marker_data: std::marker::PhantomData, marker_error: std::marker::PhantomData} } } impl<D: AppData, E: AppError> Message for GetLogEntries<D, E> { type Result = Result<Vec<messages::Entry<D>>, E>; } ////////////////////////////////////////////////////////////////////////////////////////////////// // AppendEntryToLog ////////////////////////////////////////////////////////////////////////////// /// A request from Raft to append a new entry to the log. /// /// These requests come about via client requests, and as such, this is the only RaftStorage /// interface which is allowed to return errors which will not cause Raft to shutdown. Application /// errors coming from this interface will be sent back as-is to the call point where your /// application originally presented the client request to Raft. /// /// This property of error handling allows you to keep your application logic as close to the /// storage layer as needed. pub struct AppendEntryToLog<D: AppData, E: AppError> { pub entry: Arc<messages::Entry<D>>, marker: std::marker::PhantomData<E>, } impl<D: AppData, E: AppError> AppendEntryToLog<D, E> { // Create a new instance. pub fn new(entry: Arc<messages::Entry<D>>) -> Self { Self{entry, marker: std::marker::PhantomData} } } impl<D: AppData, E: AppError> Message for AppendEntryToLog<D, E> { type Result = Result<(), E>; } ////////////////////////////////////////////////////////////////////////////////////////////////// // ReplicateToLog //////////////////////////////////////////////////////////////////////////////// /// A request from Raft to replicate a payload of entries to the log. /// /// These requests come about via the Raft leader's replication process. An error coming from this /// interface will cause Raft to shutdown, as this is not where application logic should be /// returning application specific errors. Application specific constraints may only be enforced /// in the `AppendEntryToLog` handler. /// /// Though the entries will always be presented in order, each entry's index should be used to /// determine its location to be written in the log, as logs may need to be overwritten under /// some circumstances. pub struct ReplicateToLog<D: AppData, E: AppError> { pub entries: Arc<Vec<messages::Entry<D>>>, marker: std::marker::PhantomData<E>, } impl<D: AppData, E: AppError> ReplicateToLog<D, E> { // Create a new instance. pub fn new(entries: Arc<Vec<messages::Entry<D>>>) -> Self { Self{entries, marker: std::marker::PhantomData} } } impl<D: AppData, E: AppError> Message for ReplicateToLog<D, E> { type Result = Result<(), E>; } ////////////////////////////////////////////////////////////////////////////////////////////////// // ApplyEntryToStateMachine ////////////////////////////////////////////////////////////////////// /// A request from Raft to apply the given log entry to the state machine. /// /// This handler is called as part of the client request path. Client requests which are /// configured to respond after they have been `Applied` will wait until after this handler /// returns before issuing a response to the client request. /// /// The Raft protocol guarantees that only logs which have been _committed_, that is, logs which /// have been replicated to a majority of the cluster, will be applied to the state machine. pub struct ApplyEntryToStateMachine<D: AppData, R: AppDataResponse, E: AppError> { pub payload: Arc<messages::Entry<D>>, marker0: std::marker::PhantomData<R>, marker1: std::marker::PhantomData<E>, } impl<D: AppData, R: AppDataResponse, E: AppError> ApplyEntryToStateMachine<D, R, E> { // Create a new instance. pub fn new(payload: Arc<messages::Entry<D>>) -> Self { Self{payload, marker0: std::marker::PhantomData, marker1: std::marker::PhantomData} } } impl<D: AppData, R: AppDataResponse, E: AppError> Message for ApplyEntryToStateMachine<D, R, E> { type Result = Result<R, E>; } ////////////////////////////////////////////////////////////////////////////////////////////////// // ReplicateToStateMachine /////////////////////////////////////////////////////////////////////// /// A request from Raft to apply the given payload of entries to the state machine, as part of replication. /// /// The Raft protocol guarantees that only logs which have been _committed_, that is, logs which /// have been replicated to a majority of the cluster, will be applied to the state machine. pub struct ReplicateToStateMachine<D: AppData, E: AppError> { pub payload: Vec<messages::Entry<D>>, marker: std::marker::PhantomData<E>, } impl<D: AppData, E: AppError> ReplicateToStateMachine<D, E> { // Create a new instance. pub fn new(payload: Vec<messages::Entry<D>>) -> Self { Self{payload, marker: std::marker::PhantomData} } } impl<D: AppData, E: AppError> Message for ReplicateToStateMachine<D, E> { type Result = Result<(), E>; } ////////////////////////////////////////////////////////////////////////////////////////////////// // CreateSnapshot //////////////////////////////////////////////////////////////////////////////// /// A request from Raft to have a new snapshot created which covers the current breadth /// of the log. /// /// See the [storage chapter of the guide](https://railgun-rs.github.io/actix-raft/storage.html#CreateSnapshot) /// for details on how to implement this handler. pub struct CreateSnapshot<E: AppError> { /// The new snapshot should start from entry `0` and should cover all entries through the /// index specified here, inclusive. pub through: u64, marker: std::marker::PhantomData<E>, } impl<E: AppError> CreateSnapshot<E> { // Create a new instance. pub fn new(through: u64) -> Self { Self{through, marker: std::marker::PhantomData} } } impl<E: AppError> Message for CreateSnapshot<E> { type Result = Result<CurrentSnapshotData, E>; } ////////////////////////////////////////////////////////////////////////////////////////////////// // InstallSnapshot /////////////////////////////////////////////////////////////////////////////// /// A request from Raft to have a new snapshot written to disk and installed. /// /// See the [storage chapter of the guide](https://railgun-rs.github.io/actix-raft/storage.html#InstallSnapshot) /// for details on how to implement this handler. pub struct InstallSnapshot<E: AppError> { /// The term which the final entry of this snapshot covers. pub term: u64, /// The index of the final entry which this snapshot covers. pub index: u64, /// A stream of data chunks for this snapshot. pub stream: UnboundedReceiver<InstallSnapshotChunk>, marker: std::marker::PhantomData<E>, } impl<E: AppError> InstallSnapshot<E> { // Create a new instance. pub fn new(term: u64, index: u64, stream: UnboundedReceiver<InstallSnapshotChunk>) -> Self { Self{term, index, stream, marker: std::marker::PhantomData} } } impl<E: AppError> Message for InstallSnapshot<E> { type Result = Result<(), E>; } /// A chunk of snapshot data. pub struct InstallSnapshotChunk { /// The byte offset where chunk is positioned in the snapshot file. pub offset: u64, /// The raw bytes of the snapshot chunk, starting at `offset`. pub data: Vec<u8>, /// Will be `true` if this is the last chunk in the snapshot. pub done: bool, /// A callback channel to indicate when the chunk has been successfully written. pub cb: Sender<()>, } ////////////////////////////////////////////////////////////////////////////////////////////////// // GetCurrentSnapshot //////////////////////////////////////////////////////////////////////////// /// A request from Raft to get metadata of the current snapshot. /// /// ### implementation algorithm /// Implementation for this type's handler should be quite simple. Check the configured snapshot /// directory for any snapshot files. A proper implementation will only ever have one /// active snapshot, though another may exist while it is being created. As such, it is /// recommended to use a file naming pattern which will allow for easily distinguishing between /// the current live snapshot, and any new snapshot which is being created. pub struct GetCurrentSnapshot<E: AppError> { marker: std::marker::PhantomData<E>, } impl<E: AppError> GetCurrentSnapshot<E> { // Create a new instance. pub fn new() -> Self { Self{marker: std::marker::PhantomData} } } impl<E: AppError> Message for GetCurrentSnapshot<E> { type Result = Result<Option<CurrentSnapshotData>, E>; } /// The data associated with the current snapshot. #[derive(Clone, Debug, PartialEq)] pub struct CurrentSnapshotData { /// The snapshot entry's term. pub term: u64, /// The snapshot entry's index. pub index: u64, /// The latest membership configuration covered by the snapshot. pub membership: messages::MembershipConfig, /// The snapshot entry's pointer to the snapshot file. pub pointer: messages::EntrySnapshotPointer, } ////////////////////////////////////////////////////////////////////////////////////////////////// // SaveHardState ///////////////////////////////////////////////////////////////////////////////// /// A request from Raft to save its HardState. pub struct SaveHardState<E: AppError>{ pub hs: HardState, marker: std::marker::PhantomData<E>, } impl<E: AppError> SaveHardState<E> { // Create a new instance. pub fn new(hs: HardState) -> Self { Self{hs, marker: std::marker::PhantomData} } } impl<E: AppError> Message for SaveHardState<E> { type Result = Result<(), E>; } /// A record holding the hard state of a Raft node. /// /// This model derives serde's traits for easily (de)serializing this /// model for storage & retrieval. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct HardState { /// The last recorded term observed by this system. pub current_term: u64, /// The ID of the node voted for in the `current_term`. pub voted_for: Option<NodeId>, /// The cluster membership configuration. pub membership: messages::MembershipConfig, } ////////////////////////////////////////////////////////////////////////////////////////////////// // RaftStorage /////////////////////////////////////////////////////////////////////////////////// /// A trait defining the interface of a Raft storage actor. /// /// See the [storage chapter of the guide](https://railgun-rs.github.io/actix-raft/storage.html#InstallSnapshot) /// for details and discussion on this trait and how to implement it. pub trait RaftStorage<D, R, E>: 'static where D: AppData, R: AppDataResponse, E: AppError, { /// The type to use as the storage actor. Should just be Self. type Actor: Actor<Context=Self::Context> + Handler<GetInitialState<E>> + Handler<SaveHardState<E>> + Handler<GetLogEntries<D, E>> + Handler<AppendEntryToLog<D, E>> + Handler<ReplicateToLog<D, E>> + Handler<ApplyEntryToStateMachine<D, R, E>> + Handler<ReplicateToStateMachine<D, E>> + Handler<CreateSnapshot<E>> + Handler<InstallSnapshot<E>> + Handler<GetCurrentSnapshot<E>>; /// The type to use as the storage actor's context. Should be `Context<Self>` or `SyncContext<Self>`. type Context: ActorContext + ToEnvelope<Self::Actor, GetInitialState<E>> + ToEnvelope<Self::Actor, SaveHardState<E>> + ToEnvelope<Self::Actor, GetLogEntries<D, E>> + ToEnvelope<Self::Actor, AppendEntryToLog<D, E>> + ToEnvelope<Self::Actor, ReplicateToLog<D, E>> + ToEnvelope<Self::Actor, ApplyEntryToStateMachine<D, R, E>> + ToEnvelope<Self::Actor, ReplicateToStateMachine<D, E>> + ToEnvelope<Self::Actor, CreateSnapshot<E>> + ToEnvelope<Self::Actor, InstallSnapshot<E>> + ToEnvelope<Self::Actor, GetCurrentSnapshot<E>>; }