use crate::{log::LogEntry, log::Snapshot, LogIndex, NodeId, Term};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AppendEntriesRequest {
pub term: Term,
pub leader_id: NodeId,
pub prev_log_index: LogIndex,
pub prev_log_term: Term,
pub entries: Vec<LogEntry>,
pub leader_commit: LogIndex,
}
impl AppendEntriesRequest {
pub fn new(
term: Term,
leader_id: NodeId,
prev_log_index: LogIndex,
prev_log_term: Term,
entries: Vec<LogEntry>,
leader_commit: LogIndex,
) -> Self {
Self {
term,
leader_id,
prev_log_index,
prev_log_term,
entries,
leader_commit,
}
}
pub fn heartbeat(term: Term, leader_id: NodeId, leader_commit: LogIndex) -> Self {
Self {
term,
leader_id,
prev_log_index: 0,
prev_log_term: 0,
entries: Vec::new(),
leader_commit,
}
}
pub fn is_heartbeat(&self) -> bool {
self.entries.is_empty()
}
pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError> {
use bincode::config;
bincode::encode_to_vec(bincode::serde::Compat(self), config::standard())
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError> {
use bincode::config;
let (compat, _): (bincode::serde::Compat<Self>, _) =
bincode::decode_from_slice(bytes, config::standard())?;
Ok(compat.0)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AppendEntriesResponse {
pub term: Term,
pub success: bool,
pub match_index: Option<LogIndex>,
pub conflict_index: Option<LogIndex>,
pub conflict_term: Option<Term>,
}
impl AppendEntriesResponse {
pub fn success(term: Term, match_index: LogIndex) -> Self {
Self {
term,
success: true,
match_index: Some(match_index),
conflict_index: None,
conflict_term: None,
}
}
pub fn failure(
term: Term,
conflict_index: Option<LogIndex>,
conflict_term: Option<Term>,
) -> Self {
Self {
term,
success: false,
match_index: None,
conflict_index,
conflict_term,
}
}
pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError> {
use bincode::config;
bincode::encode_to_vec(bincode::serde::Compat(self), config::standard())
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError> {
use bincode::config;
let (compat, _): (bincode::serde::Compat<Self>, _) =
bincode::decode_from_slice(bytes, config::standard())?;
Ok(compat.0)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RequestVoteRequest {
pub term: Term,
pub candidate_id: NodeId,
pub last_log_index: LogIndex,
pub last_log_term: Term,
}
impl RequestVoteRequest {
pub fn new(
term: Term,
candidate_id: NodeId,
last_log_index: LogIndex,
last_log_term: Term,
) -> Self {
Self {
term,
candidate_id,
last_log_index,
last_log_term,
}
}
pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError> {
use bincode::config;
bincode::encode_to_vec(bincode::serde::Compat(self), config::standard())
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError> {
use bincode::config;
let (compat, _): (bincode::serde::Compat<Self>, _) =
bincode::decode_from_slice(bytes, config::standard())?;
Ok(compat.0)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RequestVoteResponse {
pub term: Term,
pub vote_granted: bool,
}
impl RequestVoteResponse {
pub fn granted(term: Term) -> Self {
Self {
term,
vote_granted: true,
}
}
pub fn denied(term: Term) -> Self {
Self {
term,
vote_granted: false,
}
}
pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError> {
use bincode::config;
bincode::encode_to_vec(bincode::serde::Compat(self), config::standard())
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError> {
use bincode::config;
let (compat, _): (bincode::serde::Compat<Self>, _) =
bincode::decode_from_slice(bytes, config::standard())?;
Ok(compat.0)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InstallSnapshotRequest {
pub term: Term,
pub leader_id: NodeId,
pub last_included_index: LogIndex,
pub last_included_term: Term,
pub offset: u64,
pub data: Vec<u8>,
pub done: bool,
}
impl InstallSnapshotRequest {
pub fn new(
term: Term,
leader_id: NodeId,
snapshot: Snapshot,
offset: u64,
chunk_size: usize,
) -> Self {
let data_len = snapshot.data.len();
let chunk_end = std::cmp::min(offset as usize + chunk_size, data_len);
let chunk = snapshot.data[offset as usize..chunk_end].to_vec();
let done = chunk_end >= data_len;
Self {
term,
leader_id,
last_included_index: snapshot.last_included_index,
last_included_term: snapshot.last_included_term,
offset,
data: chunk,
done,
}
}
pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError> {
use bincode::config;
bincode::encode_to_vec(bincode::serde::Compat(self), config::standard())
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError> {
use bincode::config;
let (compat, _): (bincode::serde::Compat<Self>, _) =
bincode::decode_from_slice(bytes, config::standard())?;
Ok(compat.0)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InstallSnapshotResponse {
pub term: Term,
pub success: bool,
pub next_offset: Option<u64>,
}
impl InstallSnapshotResponse {
pub fn success(term: Term, next_offset: Option<u64>) -> Self {
Self {
term,
success: true,
next_offset,
}
}
pub fn failure(term: Term) -> Self {
Self {
term,
success: false,
next_offset: None,
}
}
pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError> {
use bincode::config;
bincode::encode_to_vec(bincode::serde::Compat(self), config::standard())
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError> {
use bincode::config;
let (compat, _): (bincode::serde::Compat<Self>, _) =
bincode::decode_from_slice(bytes, config::standard())?;
Ok(compat.0)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RaftMessage {
AppendEntriesRequest(AppendEntriesRequest),
AppendEntriesResponse(AppendEntriesResponse),
RequestVoteRequest(RequestVoteRequest),
RequestVoteResponse(RequestVoteResponse),
InstallSnapshotRequest(InstallSnapshotRequest),
InstallSnapshotResponse(InstallSnapshotResponse),
}
impl RaftMessage {
pub fn term(&self) -> Term {
match self {
RaftMessage::AppendEntriesRequest(req) => req.term,
RaftMessage::AppendEntriesResponse(resp) => resp.term,
RaftMessage::RequestVoteRequest(req) => req.term,
RaftMessage::RequestVoteResponse(resp) => resp.term,
RaftMessage::InstallSnapshotRequest(req) => req.term,
RaftMessage::InstallSnapshotResponse(resp) => resp.term,
}
}
pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError> {
use bincode::config;
bincode::encode_to_vec(bincode::serde::Compat(self), config::standard())
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError> {
use bincode::config;
let (compat, _): (bincode::serde::Compat<Self>, _) =
bincode::decode_from_slice(bytes, config::standard())?;
Ok(compat.0)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_append_entries_heartbeat() {
let req = AppendEntriesRequest::heartbeat(1, "leader".to_string(), 10);
assert!(req.is_heartbeat());
assert_eq!(req.entries.len(), 0);
}
#[test]
fn test_append_entries_serialization() {
let req = AppendEntriesRequest::new(1, "leader".to_string(), 10, 1, vec![], 10);
let bytes = req.to_bytes().unwrap();
let decoded = AppendEntriesRequest::from_bytes(&bytes).unwrap();
assert_eq!(req.term, decoded.term);
assert_eq!(req.leader_id, decoded.leader_id);
}
#[test]
fn test_request_vote_serialization() {
let req = RequestVoteRequest::new(2, "candidate".to_string(), 15, 2);
let bytes = req.to_bytes().unwrap();
let decoded = RequestVoteRequest::from_bytes(&bytes).unwrap();
assert_eq!(req.term, decoded.term);
assert_eq!(req.candidate_id, decoded.candidate_id);
}
#[test]
fn test_response_types() {
let success = AppendEntriesResponse::success(1, 10);
assert!(success.success);
assert_eq!(success.match_index, Some(10));
let failure = AppendEntriesResponse::failure(1, Some(5), Some(1));
assert!(!failure.success);
assert_eq!(failure.conflict_index, Some(5));
}
#[test]
fn test_vote_responses() {
let granted = RequestVoteResponse::granted(1);
assert!(granted.vote_granted);
let denied = RequestVoteResponse::denied(1);
assert!(!denied.vote_granted);
}
}