use crate::error::{Error, Result};
use crate::types::{HardState, Index, LogEntry, NodeId, Snapshot, Term};
pub trait RaftLog {
fn last_index(&self) -> Index;
fn last_term(&self) -> Term;
fn term_at(&self, index: Index) -> Option<Term>;
fn entry(&self, index: Index) -> Option<LogEntry>;
fn entries(&self, from: Index, to: Index) -> Vec<LogEntry> {
if from == 0 || to < from {
return Vec::new();
}
let mut out = Vec::with_capacity((to - from + 1) as usize);
let mut index = from;
while index <= to {
if let Some(entry) = self.entry(index) {
out.push(entry);
}
index += 1;
}
out
}
fn append(&mut self, entries: &[LogEntry]) -> Result<()>;
fn truncate(&mut self, from: Index) -> Result<()>;
fn hard_state(&self) -> HardState;
fn set_hard_state(&mut self, state: HardState) -> Result<()>;
fn sync(&mut self) -> Result<()>;
fn snapshot_index(&self) -> Index {
0
}
fn snapshot(&self) -> Option<Snapshot> {
None
}
fn apply_snapshot(&mut self, snapshot: &Snapshot) -> Result<()> {
let _ = snapshot;
Err(Error::storage(
"apply snapshot",
"this log does not support snapshots",
))
}
}
#[derive(Clone, Debug, Default)]
pub struct MemoryLog {
entries: Vec<LogEntry>,
base_index: Index,
base_term: Term,
snapshot: Option<Vec<u8>>,
snapshot_config: Vec<NodeId>,
hard: HardState,
}
impl MemoryLog {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[inline]
#[must_use]
pub fn len(&self) -> usize {
self.entries.len()
}
#[inline]
#[must_use]
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
}
impl MemoryLog {
#[inline]
fn slot(&self, index: Index) -> Option<usize> {
if index <= self.base_index || index > self.last_index() {
None
} else {
Some((index - self.base_index - 1) as usize)
}
}
}
impl RaftLog for MemoryLog {
#[inline]
fn last_index(&self) -> Index {
self.base_index + self.entries.len() as Index
}
#[inline]
fn last_term(&self) -> Term {
self.entries.last().map_or(self.base_term, |e| e.term)
}
fn term_at(&self, index: Index) -> Option<Term> {
if index == self.base_index {
return Some(self.base_term);
}
self.slot(index).map(|s| self.entries[s].term)
}
fn entry(&self, index: Index) -> Option<LogEntry> {
self.slot(index).map(|s| self.entries[s].clone())
}
fn entries(&self, from: Index, to: Index) -> Vec<LogEntry> {
if from == 0 {
return Vec::new();
}
let from = from.max(self.base_index + 1);
if to < from {
return Vec::new();
}
let start = (from - self.base_index - 1) as usize;
let end = ((to - self.base_index) as usize).min(self.entries.len());
if start >= end {
return Vec::new();
}
self.entries[start..end].to_vec()
}
fn append(&mut self, entries: &[LogEntry]) -> Result<()> {
if entries.is_empty() {
return Ok(());
}
let expected = self.last_index() + 1;
if entries[0].index != expected {
return Err(Error::storage(
"append entries",
format!(
"non-contiguous append: expected index {expected}, got {}",
entries[0].index
),
));
}
for pair in entries.windows(2) {
if pair[1].index != pair[0].index + 1 {
return Err(Error::storage(
"append entries",
"entries within the batch are not contiguous",
));
}
}
self.entries.extend_from_slice(entries);
Ok(())
}
fn truncate(&mut self, from: Index) -> Result<()> {
if from <= self.base_index {
return Err(Error::storage(
"truncate log",
"cannot truncate into the snapshot",
));
}
let keep = (from - self.base_index - 1) as usize;
if keep < self.entries.len() {
self.entries.truncate(keep);
}
Ok(())
}
#[inline]
fn hard_state(&self) -> HardState {
self.hard
}
#[inline]
fn set_hard_state(&mut self, state: HardState) -> Result<()> {
self.hard = state;
Ok(())
}
#[inline]
fn sync(&mut self) -> Result<()> {
Ok(())
}
#[inline]
fn snapshot_index(&self) -> Index {
self.base_index
}
fn snapshot(&self) -> Option<Snapshot> {
self.snapshot.as_ref().map(|data| {
Snapshot::with_config(
self.base_index,
self.base_term,
self.snapshot_config.clone(),
data.clone(),
)
})
}
fn apply_snapshot(&mut self, snapshot: &Snapshot) -> Result<()> {
if snapshot.index <= self.base_index {
return Ok(());
}
if self.term_at(snapshot.index) == Some(snapshot.term) {
let drop = ((snapshot.index - self.base_index) as usize).min(self.entries.len());
let _ = self.entries.drain(0..drop);
} else {
self.entries.clear();
}
self.base_index = snapshot.index;
self.base_term = snapshot.term;
self.snapshot = Some(snapshot.data.clone());
self.snapshot_config = snapshot.config.clone();
Ok(())
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used, clippy::expect_used)]
use super::*;
fn entry(term: Term, index: Index) -> LogEntry {
LogEntry::new(term, index, vec![index as u8])
}
#[test]
fn test_empty_log_reports_zero() {
let log = MemoryLog::new();
assert_eq!(log.last_index(), 0);
assert_eq!(log.last_term(), 0);
assert!(log.is_empty());
assert_eq!(log.entry(0), None);
assert_eq!(log.entry(1), None);
}
#[test]
fn test_term_at_sentinel_is_zero() {
assert_eq!(MemoryLog::new().term_at(0), Some(0));
}
#[test]
fn test_append_and_read_back() {
let mut log = MemoryLog::new();
log.append(&[entry(1, 1), entry(1, 2)]).unwrap();
log.append(&[entry(2, 3)]).unwrap();
assert_eq!(log.last_index(), 3);
assert_eq!(log.last_term(), 2);
assert_eq!(log.term_at(2), Some(1));
assert_eq!(log.term_at(3), Some(2));
assert_eq!(log.term_at(4), None);
assert_eq!(log.entry(3).unwrap().term, 2);
}
#[test]
fn test_append_empty_is_noop() {
let mut log = MemoryLog::new();
log.append(&[]).unwrap();
assert_eq!(log.last_index(), 0);
}
#[test]
fn test_append_rejects_gap() {
let mut log = MemoryLog::new();
let err = log.append(&[entry(1, 2)]).unwrap_err();
assert!(matches!(err, Error::Storage { .. }));
}
#[test]
fn test_append_rejects_internally_noncontiguous_batch() {
let mut log = MemoryLog::new();
let err = log.append(&[entry(1, 1), entry(1, 3)]).unwrap_err();
assert!(matches!(err, Error::Storage { .. }));
}
#[test]
fn test_entries_range_inclusive() {
let mut log = MemoryLog::new();
log.append(&[entry(1, 1), entry(1, 2), entry(2, 3), entry(2, 4)])
.unwrap();
let mid = log.entries(2, 3);
assert_eq!(mid.len(), 2);
assert_eq!(mid[0].index, 2);
assert_eq!(mid[1].index, 3);
}
#[test]
fn test_entries_range_clamps_and_handles_empty() {
let mut log = MemoryLog::new();
log.append(&[entry(1, 1), entry(1, 2)]).unwrap();
assert_eq!(log.entries(1, 99).len(), 2);
assert!(log.entries(3, 2).is_empty());
assert!(log.entries(0, 5).is_empty());
assert!(log.entries(5, 9).is_empty());
}
#[test]
fn test_default_entries_matches_override() {
struct Wrap(MemoryLog);
impl RaftLog for Wrap {
fn last_index(&self) -> Index {
self.0.last_index()
}
fn last_term(&self) -> Term {
self.0.last_term()
}
fn term_at(&self, index: Index) -> Option<Term> {
self.0.term_at(index)
}
fn entry(&self, index: Index) -> Option<LogEntry> {
self.0.entry(index)
}
fn append(&mut self, entries: &[LogEntry]) -> Result<()> {
self.0.append(entries)
}
fn truncate(&mut self, from: Index) -> Result<()> {
self.0.truncate(from)
}
fn hard_state(&self) -> HardState {
self.0.hard_state()
}
fn set_hard_state(&mut self, state: HardState) -> Result<()> {
self.0.set_hard_state(state)
}
fn sync(&mut self) -> Result<()> {
self.0.sync()
}
}
let mut inner = MemoryLog::new();
inner
.append(&[entry(1, 1), entry(1, 2), entry(2, 3)])
.unwrap();
let wrap = Wrap(inner.clone());
assert_eq!(wrap.entries(1, 3), inner.entries(1, 3));
assert_eq!(wrap.entries(2, 2), inner.entries(2, 2));
}
#[test]
fn test_truncate_removes_tail() {
let mut log = MemoryLog::new();
log.append(&[entry(1, 1), entry(1, 2), entry(1, 3)])
.unwrap();
log.truncate(2).unwrap();
assert_eq!(log.last_index(), 1);
assert_eq!(log.entry(2), None);
}
#[test]
fn test_truncate_past_end_is_noop() {
let mut log = MemoryLog::new();
log.append(&[entry(1, 1)]).unwrap();
log.truncate(5).unwrap();
assert_eq!(log.last_index(), 1);
}
#[test]
fn test_truncate_zero_is_rejected() {
let mut log = MemoryLog::new();
assert!(log.truncate(0).is_err());
}
#[test]
fn test_hard_state_round_trips() {
let mut log = MemoryLog::new();
let hs = HardState {
term: 4,
voted_for: Some(2),
};
log.set_hard_state(hs).unwrap();
assert_eq!(log.hard_state(), hs);
}
#[test]
fn test_sync_is_ok() {
assert!(MemoryLog::new().sync().is_ok());
}
#[test]
fn test_apply_snapshot_compacts_and_keeps_matching_tail() {
let mut log = MemoryLog::new();
log.append(&[entry(1, 1), entry(1, 2), entry(2, 3), entry(2, 4)])
.unwrap();
log.apply_snapshot(&Snapshot::new(2, 1, b"state@2".to_vec()))
.unwrap();
assert_eq!(log.snapshot_index(), 2);
assert_eq!(log.last_index(), 4);
assert_eq!(log.entry(1), None);
assert_eq!(log.entry(2), None);
assert_eq!(log.term_at(2), Some(1)); assert_eq!(log.term_at(1), None); assert_eq!(log.entry(3).unwrap().term, 2);
assert_eq!(log.entry(4).unwrap().index, 4);
assert_eq!(log.snapshot().unwrap().data, b"state@2");
}
#[test]
fn test_apply_snapshot_clears_log_on_mismatch() {
let mut log = MemoryLog::new();
log.append(&[entry(1, 1), entry(1, 2)]).unwrap();
log.apply_snapshot(&Snapshot::new(5, 3, b"state@5".to_vec()))
.unwrap();
assert_eq!(log.snapshot_index(), 5);
assert_eq!(log.last_index(), 5);
assert_eq!(log.last_term(), 3);
assert!(log.entries(1, 5).is_empty());
assert_eq!(log.term_at(5), Some(3));
}
#[test]
fn test_append_continues_after_snapshot() {
let mut log = MemoryLog::new();
log.apply_snapshot(&Snapshot::new(7, 2, b"base".to_vec()))
.unwrap();
assert_eq!(log.last_index(), 7);
assert!(log.append(&[entry(2, 7)]).is_err()); log.append(&[entry(3, 8), entry(3, 9)]).unwrap();
assert_eq!(log.last_index(), 9);
assert_eq!(log.entry(8).unwrap().term, 3);
assert_eq!(log.term_at(7), Some(2)); }
#[test]
fn test_stale_snapshot_is_ignored() {
let mut log = MemoryLog::new();
log.apply_snapshot(&Snapshot::new(5, 2, b"new".to_vec()))
.unwrap();
log.apply_snapshot(&Snapshot::new(3, 1, b"old".to_vec()))
.unwrap();
assert_eq!(log.snapshot_index(), 5);
assert_eq!(log.snapshot().unwrap().data, b"new");
}
#[test]
fn test_truncate_into_snapshot_is_rejected() {
let mut log = MemoryLog::new();
log.apply_snapshot(&Snapshot::new(5, 2, b"s".to_vec()))
.unwrap();
assert!(log.truncate(5).is_err());
assert!(log.truncate(3).is_err());
}
#[test]
fn test_default_apply_snapshot_errors() {
struct NoSnap(MemoryLog);
impl RaftLog for NoSnap {
fn last_index(&self) -> Index {
self.0.last_index()
}
fn last_term(&self) -> Term {
self.0.last_term()
}
fn term_at(&self, index: Index) -> Option<Term> {
self.0.term_at(index)
}
fn entry(&self, index: Index) -> Option<LogEntry> {
self.0.entry(index)
}
fn append(&mut self, entries: &[LogEntry]) -> Result<()> {
self.0.append(entries)
}
fn truncate(&mut self, from: Index) -> Result<()> {
self.0.truncate(from)
}
fn hard_state(&self) -> HardState {
self.0.hard_state()
}
fn set_hard_state(&mut self, state: HardState) -> Result<()> {
self.0.set_hard_state(state)
}
fn sync(&mut self) -> Result<()> {
self.0.sync()
}
}
let mut log = NoSnap(MemoryLog::new());
assert_eq!(log.snapshot_index(), 0);
assert!(log.snapshot().is_none());
assert!(log.apply_snapshot(&Snapshot::new(1, 1, vec![])).is_err());
}
}