mosaik 0.3.17

A Rust runtime for building self-organizing, leaderless distributed systems.
Documentation
use {
	crate::{
		PeerId,
		groups::{
			Command,
			Cursor,
			Index,
			IndexRange,
			StateMachine,
			StateSync,
			Term,
		},
		primitives::{Encoded, Short},
	},
	derive_more::Display,
	serde::{Deserialize, Serialize},
};

/// Raft messages as defined in the Raft consensus algorithm.
#[derive(Clone, Display, Serialize, Deserialize)]
#[serde(bound = "")]
pub enum Message<M: StateMachine> {
	/// Sent by leaders to assert authority (heartbeat) and replicate log
	/// entries. When `entries` is empty, this is a pure heartbeat.
	#[display(
		"AppendEntries[t={}/pos={}/n={}/c={}/{}]/",
		_0.term, _0.prev_log_position, _0.entries.len(),
		_0.leader_commit, Short(_0.leader)
	)]
	AppendEntries(AppendEntries<M::Command>),

	/// Response to an `AppendEntries` message.
	#[display("AppendEntriesResponse[t={}/success={}]", _0.term, _0.vote)]
	AppendEntriesResponse(AppendEntriesResponse),

	/// Sent by candidates to gather votes during an election.
	#[display("RequestVote[t={}/log={}]@{}", _0.term, _0.log_position, Short(_0.candidate))]
	RequestVote(RequestVote),

	/// Response to a `RequestVote` message.
	#[display("RequestVoteResponse[t={}/{}]", _0.term, _0.vote)]
	RequestVoteResponse(RequestVoteResponse),

	/// Messages related to forwarding client commands and queries from followers
	/// to the leader and acknowledging them.
	#[display("Forward(..)")]
	Forward(Forward<M>),

	/// Messages related to the log synchronization process during catch-up of
	/// lagging followers.
	#[display("StateSync(..)")]
	StateSync(<M::StateSync as StateSync>::Message),
}

impl<M: StateMachine> Message<M> {
	/// Returns the term carried by the message.
	pub const fn term(&self) -> Option<Term> {
		match self {
			Self::AppendEntries(msg) => Some(msg.term),
			Self::AppendEntriesResponse(msg) => Some(msg.term),
			Self::RequestVote(msg) => Some(msg.term),
			Self::RequestVoteResponse(msg) => Some(msg.term),
			Self::Forward(_) | Self::StateSync(_) => None,
		}
	}

	/// If the message was sent by a leader, returns its peer ID.
	pub const fn leader(&self) -> Option<PeerId> {
		match self {
			Self::AppendEntries(msg) => Some(msg.leader),
			Self::AppendEntriesResponse(_)
			| Self::RequestVote(_)
			| Self::RequestVoteResponse(_)
			| Self::Forward(_)
			| Self::StateSync(_) => None,
		}
	}
}

/// `RequestVote` Message arguments.
#[derive(Clone, Display, Serialize, Deserialize)]
#[display("{}[t{term}/log={log_position}]", Short(candidate))]
pub struct RequestVote {
	/// Candidate's term.
	pub term: Term,

	/// Candidate requesting vote.
	pub candidate: PeerId,

	/// Term and index of candidate's last log entry.
	pub log_position: Cursor,
}

#[derive(Debug, Clone, Display, Serialize, Deserialize)]
pub enum Vote {
	/// Vote granted to the candidate or the leader by a voting follower that is
	/// in sync with the log.
	Granted,

	/// Vote denied to the candidate during elections.
	Denied,

	/// Abstain from voting because the follower is lagging behind the leader
	/// or candidate's log progress, or because the node is an observer that
	/// does not participate in quorum. This removes the node from the quorum
	/// denominator. Lagging nodes become voting members again once they
	/// catch up; observers abstain permanently.
	Abstained,
}

/// `RequestVote` Message response.
#[derive(Debug, Clone, Display, Serialize, Deserialize)]
#[display("{vote}@{term}")]
pub struct RequestVoteResponse {
	/// Current term, for candidate to update itself.
	pub term: Term,

	/// The vote granted to the candidate.
	pub vote: Vote,
}

/// Log entry stored in the Raft log.
#[derive(Clone, Serialize, Deserialize)]
#[serde(bound = "")]
pub struct LogEntry<C: Command> {
	/// Term when entry was received by leader.
	pub term: Term,

	/// Command for replicated state machine. This is the application-specific
	/// state transition that is replicated across the group via the Raft log.
	pub command: Encoded<C>,
}

/// `AppendEntries` message arguments.
///
/// Sent by leader to replicate log entries and as heartbeat.
/// When `entries` is empty, this serves as a heartbeat to maintain leadership.
#[derive(Clone, Serialize, Deserialize)]
#[serde(bound = "")]
pub struct AppendEntries<C: Command> {
	/// Leader's term.
	pub term: Term,

	/// Leader's peer ID, so followers can redirect clients.
	pub leader: PeerId,

	/// Term and Index of log entry immediately preceding new ones.
	pub prev_log_position: Cursor,

	/// Log entries to store (empty for heartbeat).
	pub entries: Vec<LogEntry<C>>,

	/// Leader's commit index.
	pub leader_commit: Index,
}

/// `AppendEntries` message response.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AppendEntriesResponse {
	/// Current term, for leader to update itself.
	pub term: Term,

	/// The index of the last log entry the follower has after processing this
	/// `AppendEntries`. Used by the leader to determine which entries have
	/// been replicated to a majority and can be committed. Only meaningful
	/// when `vote` is `Granted`.
	pub last_log_index: Index,

	/// Granted if follower contained entry matching `prev_log_index` and
	/// `prev_log_term`. Abstain if the follower is lagging behind and cannot
	/// verify the log consistency, which will exclude it from the quorum until
	/// it catches up.
	pub vote: Vote,
}

/// Messages used for forwarding client commands from followers to the leader.
#[derive(Clone, Serialize, Deserialize)]
#[serde(bound = "")]
pub enum Forward<M: StateMachine> {
	/// Message sent by followers to the leader to forward client commands.
	Command {
		/// The command to be executed by the leader and replicated to the group.
		commands: Vec<Encoded<M::Command>>,

		/// Optionally when set, the leader will respond back to the sender with a
		/// `ForwardCommandResponse` containing the log index assigned to the
		/// appended command so that the follower can track the commit progress of
		/// the log and determine when the command has been committed to the state
		/// machine.
		///
		/// This value is set when calling `Group::execute` or `Group::feed` from a
		/// follower.
		///
		/// This value is randomly generated by the follower.
		request_id: Option<u64>,
	},

	/// Response sent by the leader to followers that forwarded client commands
	/// with a `request_id` to inform them of the log index assigned to the
	/// appended command so that the followers can track the commit progress of
	/// the log and determine when the command has been committed to the state
	/// machine.
	CommandAck {
		/// The `request_id` from the original `ForwardCommand` sent by the
		/// follower
		request_id: u64,

		/// The log indices assigned to the appended commands by the leader, which
		/// the follower can use to track the commit progress of the log and
		/// determine when the commands have been committed to the state machine.
		assigned: IndexRange,
	},

	/// Message sent by followers to the leader to query the state machine with
	/// strong consistency guarantees.
	Query {
		/// The state-machine specific query type to be executed by the leader.
		query: Encoded<M::Query>,

		/// Generated by the follower to correlate the query with the response
		/// from the leader.
		request_id: u64,
	},

	/// Response sent by the leader to followers that forwarded state machine
	/// queries with strong consistency requirement.
	QueryResponse {
		/// The `request_id` from the original `ForwardQuery` sent by the
		/// follower.
		request_id: u64,

		/// The result of executing the query against the leader's state machine,
		/// along with the committed log position at which the query was executed.
		result: Encoded<M::QueryResult>,

		/// The committed log index at which the leader executed the query against
		/// its state machine.
		position: Index,
	},
}