1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
//! Common types and functionality used by the Raft actor. use std::sync::Arc; use futures::sync::oneshot; use crate::{ AppData, AppError, NodeId, messages::{ ClientError, ClientPayload, ClientPayloadResponse, Entry, ResponseMode, }, }; pub(crate) const CLIENT_RPC_RX_ERR: &str = "Client RPC channel receiver was unexpectedly closed."; pub(crate) const CLIENT_RPC_TX_ERR: &str = "Client RPC channel sender was unexpectedly closed."; ////////////////////////////////////////////////////////////////////////////////////////////////// // ApplyLogsTask ///////////////////////////////////////////////////////////////////////////////// /// A task declaring some set of logs which need to be applied to the state machine. pub(crate) enum ApplyLogsTask<D: AppData, E: AppError> { /// Check for & apply any logs which have been committed but which have not yet been applied. Outstanding, /// Logs which need to be applied are supplied for immediate use. Entry { /// The payload of logs to be applied. entry: Arc<Entry<D>>, /// The optional response channel for when this task is complete. chan: Option<oneshot::Sender<Result<ClientPayloadResponse, ClientError<D, E>>>>, }, } ////////////////////////////////////////////////////////////////////////////////////////////////// // ClientPayloadWithChan ///////////////////////////////////////////////////////////////////////// pub(crate) struct ClientPayloadWithChan<D: AppData, E: AppError> { pub tx: oneshot::Sender<Result<ClientPayloadResponse, ClientError<D, E>>>, pub rpc: ClientPayload<D, E>, } impl<D: AppData, E: AppError> ClientPayloadWithChan<D, E> { /// Upgrade a client payload with an assigned index & term. /// /// - `index`: the index to assign to this payload. /// - `term`: the term to assign to this payload. pub(crate) fn upgrade(self, index: u64, term: u64) -> ClientPayloadWithIndex<D, E> { ClientPayloadWithIndex::new(self, index, term) } } ////////////////////////////////////////////////////////////////////////////////////////////////// // ClientPayloadWithIndex ///////////////////////////////////////////////////////////////////////// /// A client payload which has made its way into the processing pipeline. pub(crate) struct ClientPayloadWithIndex<D: AppData, E: AppError> { /// The channel of the original client request. pub tx: oneshot::Sender<Result<ClientPayloadResponse, ClientError<D, E>>>, /// The entry of the original request with an assigned index & term, ready for storage. entry: Arc<Entry<D>>, /// The response mode of the original client request. pub response_mode: ResponseMode, /// The assigned log index of this payload. pub index: u64, /// The term associated with this payload. pub term: u64, } impl<D: AppData, E: AppError> ClientPayloadWithIndex<D, E> { /// Create a new instance. pub(self) fn new(payload: ClientPayloadWithChan<D, E>, index: u64, term: u64) -> Self { let entry = Arc::new(Entry{index: index, term: term, entry_type: payload.rpc.entry.clone()}); Self{tx: payload.tx, entry, response_mode: payload.rpc.response_mode, index, term} } /// Downgrade the payload, typically for forwarding purposes. pub(crate) fn downgrade(self) -> ClientPayloadWithChan<D, E> { let entry = match Arc::try_unwrap(self.entry) { Ok(entry) => entry.entry_type, Err(arc) => arc.entry_type.clone(), }; ClientPayloadWithChan{tx: self.tx, rpc: ClientPayload::new_base(entry, self.response_mode)} } /// Get a reference to the entry encapsulated by this payload. pub(crate) fn entry(&self) -> Arc<Entry<D>> { self.entry.clone() } } ////////////////////////////////////////////////////////////////////////////////////////////////// // DependencyAddr ///////////////////////////////////////////////////////////////////////////////// /// The set of dependency addr types used for tracking and reporting messaging errors. #[derive(Debug)] pub(crate) enum DependencyAddr { /// An addr of an internal actor which is not exposed to anything outside of this crate. RaftInternal, /// The `RaftNetwork` impl supplied to the Raft node. RaftNetwork, /// The `RaftStorage` impl supplied to the Raft node. RaftStorage, } ////////////////////////////////////////////////////////////////////////////////////////////////// // UpdateCurrentLeader /////////////////////////////////////////////////////////////////////////// /// An enum describing the way the current leader property is to be updated. pub(crate) enum UpdateCurrentLeader { Unknown, OtherNode(NodeId), ThisNode, }