use std::ops::{Add, AddAssign, Sub, SubAssign};
pub use self::history::{HistoryRecord, LogHistory};
use cluster::ClusterConfig;
use election::Term;
use {ErrorKind, Result};
mod history;
#[derive(Debug)]
pub enum Log {
Prefix(LogPrefix),
Suffix(LogSuffix),
}
impl From<LogPrefix> for Log {
fn from(f: LogPrefix) -> Self {
Log::Prefix(f)
}
}
impl From<LogSuffix> for Log {
fn from(f: LogSuffix) -> Self {
Log::Suffix(f)
}
}
#[derive(Debug, Clone)]
pub struct LogPrefix {
pub tail: LogPosition,
pub config: ClusterConfig,
pub snapshot: Vec<u8>,
}
#[derive(Debug, Clone)]
pub struct LogSuffix {
pub head: LogPosition,
pub entries: Vec<LogEntry>,
}
impl LogSuffix {
pub fn tail(&self) -> LogPosition {
let prev_term = self
.entries
.last()
.map_or(self.head.prev_term, LogEntry::term);
let index = self.head.index + self.entries.len();
LogPosition { prev_term, index }
}
pub fn positions(&self) -> LogPositions {
LogPositions {
suffix: self,
offset: 0,
}
}
pub fn skip_to(&mut self, new_head: LogIndex) -> Result<()> {
track_assert!(self.head.index <= new_head, ErrorKind::InvalidInput);
track_assert!(new_head <= self.tail().index, ErrorKind::InvalidInput);
let count = new_head - self.head.index;
if count == 0 {
return Ok(());
}
let prev_term = self
.entries
.drain(0..count)
.last()
.expect("Never fails")
.term();
self.head.prev_term = prev_term;
self.head.index += count;
Ok(())
}
pub fn truncate(&mut self, new_tail: LogIndex) -> Result<()> {
track_assert!(self.head.index <= new_tail, ErrorKind::InvalidInput);
track_assert!(new_tail <= self.tail().index, ErrorKind::InvalidInput);
let delta = self.tail().index - new_tail;
let new_len = self.entries.len() - delta;
self.entries.truncate(new_len);
Ok(())
}
pub fn slice(&self, start: LogIndex, end: LogIndex) -> Result<Self> {
track_assert!(self.head.index <= start, ErrorKind::InvalidInput);
track_assert!(start <= end, ErrorKind::InvalidInput);
track_assert!(end <= self.tail().index, ErrorKind::InvalidInput);
let slice_start = start - self.head.index;
let slice_end = end - self.head.index;
let slice_head = if start == self.head.index {
self.head
} else {
let prev_term = self.entries[slice_start - 1].term();
LogPosition {
prev_term,
index: start,
}
};
let slice_entries = self.entries[slice_start..slice_end].into();
Ok(LogSuffix {
head: slice_head,
entries: slice_entries,
})
}
}
impl Default for LogSuffix {
fn default() -> Self {
LogSuffix {
head: LogPosition::default(),
entries: Vec::new(),
}
}
}
#[derive(Debug)]
pub struct LogPositions<'a> {
suffix: &'a LogSuffix,
offset: usize,
}
impl<'a> Iterator for LogPositions<'a> {
type Item = LogPosition;
fn next(&mut self) -> Option<Self::Item> {
if self.suffix.entries.len() < self.offset {
None
} else {
let id = if self.offset == 0 {
self.suffix.head
} else {
let i = self.offset - 1;
let prev_term = self.suffix.entries[i].term();
let index = self.suffix.head.index + self.offset;
LogPosition { prev_term, index }
};
self.offset += 1;
Some(id)
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[allow(missing_docs)]
pub enum LogEntry {
Noop { term: Term },
Config { term: Term, config: ClusterConfig },
Command { term: Term, command: Vec<u8> },
}
impl LogEntry {
pub fn term(&self) -> Term {
match *self {
LogEntry::Noop { term } => term,
LogEntry::Config { term, .. } => term,
LogEntry::Command { term, .. } => term,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ProposalId {
pub term: Term,
pub index: LogIndex,
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)]
pub struct LogPosition {
pub prev_term: Term,
pub index: LogIndex,
}
impl LogPosition {
pub fn is_newer_or_equal_than(&self, other: LogPosition) -> bool {
self.prev_term >= other.prev_term && self.index >= other.index
}
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct LogIndex(u64);
impl LogIndex {
pub fn new(index: u64) -> Self {
LogIndex(index)
}
pub fn as_u64(self) -> u64 {
self.0
}
}
impl From<u64> for LogIndex {
fn from(f: u64) -> Self {
LogIndex::new(f)
}
}
impl Add<usize> for LogIndex {
type Output = Self;
fn add(self, rhs: usize) -> Self::Output {
LogIndex(self.0 + rhs as u64)
}
}
impl AddAssign<usize> for LogIndex {
fn add_assign(&mut self, rhs: usize) {
self.0 += rhs as u64;
}
}
impl Sub for LogIndex {
type Output = usize;
fn sub(self, rhs: Self) -> Self::Output {
(self.0 - rhs.0) as usize
}
}
impl Sub<usize> for LogIndex {
type Output = Self;
fn sub(self, rhs: usize) -> Self::Output {
LogIndex(self.0 - rhs as u64)
}
}
impl SubAssign<usize> for LogIndex {
fn sub_assign(&mut self, rhs: usize) {
self.0 -= rhs as u64;
}
}
#[cfg(test)]
mod test {
use super::*;
fn id(prev_term: u64, index: u64) -> LogPosition {
LogPosition {
prev_term: prev_term.into(),
index: index.into(),
}
}
fn noop(term: u64) -> LogEntry {
LogEntry::Noop { term: term.into() }
}
#[test]
fn log_suffix_end() {
let suffix = LogSuffix::default();
assert_eq!(suffix.tail().index.as_u64(), 0);
let suffix = LogSuffix {
head: LogPosition::default(),
entries: vec![noop(0), noop(1)],
};
assert_eq!(suffix.tail().index.as_u64(), 2);
}
#[test]
fn log_suffix_positions() {
let suffix = LogSuffix::default();
assert_eq!(suffix.positions().collect::<Vec<_>>(), [id(0, 0)]);
let suffix = LogSuffix {
head: LogPosition {
prev_term: 0.into(),
index: 30.into(),
},
entries: vec![noop(0), noop(2), noop(2)],
};
assert_eq!(
suffix.positions().collect::<Vec<_>>(),
[id(0, 30), id(0, 31), id(2, 32), id(2, 33)]
);
}
#[test]
fn log_suffix_skip_to() {
let mut suffix = LogSuffix {
head: LogPosition {
prev_term: 0.into(),
index: 30.into(),
},
entries: vec![noop(0), noop(2), noop(2)],
};
assert_eq!(
suffix.positions().collect::<Vec<_>>(),
[id(0, 30), id(0, 31), id(2, 32), id(2, 33)]
);
assert_eq!(suffix.entries.len(), 3);
suffix.skip_to(31.into()).unwrap();
assert_eq!(
suffix.positions().collect::<Vec<_>>(),
[id(0, 31), id(2, 32), id(2, 33)]
);
assert_eq!(suffix.entries.len(), 2);
suffix.skip_to(33.into()).unwrap();
assert_eq!(suffix.positions().collect::<Vec<_>>(), [id(2, 33)]);
assert_eq!(suffix.entries.len(), 0);
suffix.skip_to(33.into()).unwrap();
assert_eq!(suffix.positions().collect::<Vec<_>>(), [id(2, 33)]);
assert_eq!(suffix.entries.len(), 0);
}
#[test]
fn log_suffix_truncate() {
let mut suffix = LogSuffix {
head: LogPosition {
prev_term: 0.into(),
index: 30.into(),
},
entries: vec![noop(0), noop(2), noop(2)],
};
assert_eq!(
suffix.positions().collect::<Vec<_>>(),
[id(0, 30), id(0, 31), id(2, 32), id(2, 33)]
);
assert_eq!(suffix.entries.len(), 3);
suffix.truncate(31.into()).unwrap();
assert_eq!(
suffix.positions().collect::<Vec<_>>(),
[id(0, 30), id(0, 31)]
);
assert_eq!(suffix.entries.len(), 1);
}
#[test]
fn log_suffix_slice() {
let suffix = LogSuffix {
head: LogPosition {
prev_term: 0.into(),
index: 30.into(),
},
entries: vec![noop(0), noop(2), noop(2)],
};
assert_eq!(
suffix.positions().collect::<Vec<_>>(),
[id(0, 30), id(0, 31), id(2, 32), id(2, 33)]
);
assert_eq!(suffix.entries.len(), 3);
let slice = suffix.slice(31.into(), 33.into()).unwrap();
assert_eq!(
slice.positions().collect::<Vec<_>>(),
[id(0, 31), id(2, 32), id(2, 33)]
);
assert_eq!(slice.entries.len(), 2);
}
}