use std::convert::TryFrom;
use std::iter::{DoubleEndedIterator, FusedIterator};
use crate::serde::{Entry, Index, Term};
pub struct CompressedLog {
begin: Option<(Term, Index)>,
end: Option<(Term, Index)>,
term_changes: Vec<(Term, Index)>,
}
impl CompressedLog {
pub fn new() -> CompressedLog {
CompressedLog { begin: None, end: None, term_changes: Vec::new() }
}
pub fn first(&self) -> (Term, Index) {
self.begin.unwrap_or((Term(0), Index(0)))
}
pub fn last(&self) -> (Term, Index) {
self.end.unwrap_or((Term(0), Index(0)))
}
pub fn index_term(&self, index: Index) -> Option<Term> {
if index == Index(0) {
return Some(Term(0));
}
self.iter().find(|(_, i)| *i == index).map(|(t, _)| t)
}
pub fn extend(&mut self, entries: &[Entry]) {
if let Some(entry) = entries.first() {
self.trim(Index(entry.index.0 - 1));
}
self.extend_trimmed(entries);
}
fn trim(&mut self, index: Index) {
while let Some((_, tc_index)) = self.term_changes.last().copied() {
if index >= tc_index {
break;
}
self.term_changes.pop();
}
match self.term_changes.last().copied() {
None => {
self.begin = None;
self.end = None;
}
Some((tc_term, _)) => {
self.end = Some((tc_term, index));
}
}
}
fn extend_trimmed(&mut self, entries: &[Entry]) {
for entry in entries {
if self.begin == None {
self.begin = Some((entry.term, entry.index));
}
debug_assert_eq!(entry.index, Index(self.end.map_or(0, |(_, i)| i.0) + 1));
self.end = Some((entry.term, entry.index));
if self.term_changes.last().copied().map_or(true, |(tc_term, _)| entry.term != tc_term) {
self.term_changes.push((entry.term, entry.index));
}
}
}
pub fn iter<'a>(&'a self) -> CompressedLogIterator<'a> {
let (begin_term, begin_index) = self.first();
let (end_term, end_index) =
self.end.map_or((begin_term, begin_index), |(end_term, end_index)| {
(end_term, Index(end_index.0 + 1))
});
CompressedLogIterator {
begin_term: begin_term,
begin_index: begin_index,
end_term: end_term,
end_index: end_index,
term_changes: &self.term_changes,
}
}
}
pub struct CompressedLogIterator<'a> {
begin_term: Term,
begin_index: Index,
end_term: Term,
end_index: Index,
term_changes: &'a [(Term, Index)],
}
impl<'a> Iterator for CompressedLogIterator<'a> {
type Item = (Term, Index);
fn size_hint(&self) -> (usize, Option<usize>) {
let len = usize::try_from(self.end_index.0 - self.begin_index.0).unwrap_or(usize::MAX);
(len, Some(len))
}
fn next(&mut self) -> Option<Self::Item> {
if self.begin_index == self.end_index {
return None;
}
let term = self.begin_term;
let index = self.begin_index;
if let Some((_, tc_index)) = self.term_changes.first().copied() {
if self.begin_index == tc_index {
self.term_changes = &self.term_changes[1..];
}
}
self.begin_index = Index(self.begin_index.0 + 1);
if let Some((tc_term, tc_index)) = self.term_changes.first().copied() {
if self.begin_index == tc_index {
self.begin_term = tc_term;
}
}
Some((term, index))
}
}
impl<'a> DoubleEndedIterator for CompressedLogIterator<'a> {
fn next_back(&mut self) -> Option<Self::Item> {
if self.begin_index == self.end_index {
return None;
}
self.end_index = Index(self.end_index.0 - 1);
if let Some((_, tc_index)) = self.term_changes.last().copied() {
if self.end_index < tc_index {
self.term_changes = &self.term_changes[..self.term_changes.len() - 1];
self.end_term = self.term_changes.last().map_or(self.begin_term, |(tc_term, _)| *tc_term);
}
}
Some((self.end_term, self.end_index))
}
}
impl<'a> ExactSizeIterator for CompressedLogIterator<'a> {}
impl<'a> FusedIterator for CompressedLogIterator<'a> {}
#[cfg(test)]
mod tests {
#![allow(clippy::wildcard_imports)]
use super::*;
#[test]
fn empty() {
let log = CompressedLog::new();
assert_eq!((Term(0), Index(0)), log.first());
assert_eq!((Term(0), Index(0)), log.last());
assert_eq!(Some(Term(0)), log.index_term(Index(0)));
assert_eq!(None, log.index_term(Index(1)));
assert_eq!(0, log.iter().len());
assert_eq!(0, log.iter().rev().len());
assert_eq!(Vec::<(Term, Index)>::new(), log.iter().collect::<Vec<_>>());
assert_eq!(Vec::<(Term, Index)>::new(), log.iter().rev().collect::<Vec<_>>());
}
#[test]
fn compressed_log() {
let mut log = CompressedLog::new();
let history =
vec![(Term(1), Index(1)), (Term(2), Index(2)), (Term(2), Index(3)), (Term(3), Index(4))];
let history_rev = history.iter().rev().copied().collect::<Vec<_>>();
let entries = history
.iter()
.copied()
.map(|(term, index)| Entry { term: term, index: index, payload: vec![] })
.collect::<Vec<_>>();
log.extend(&entries);
assert_eq!((Term(1), Index(1)), log.first());
assert_eq!((Term(3), Index(4)), log.last());
assert_eq!(Some(Term(0)), log.index_term(Index(0)));
assert_eq!(Some(Term(1)), log.index_term(Index(1)));
assert_eq!(Some(Term(3)), log.index_term(Index(4)));
assert_eq!(None, log.index_term(Index(5)));
assert_eq!(4, log.iter().len());
assert_eq!(4, log.iter().rev().len());
assert_eq!(history, log.iter().collect::<Vec<_>>());
assert_eq!(history_rev, log.iter().rev().collect::<Vec<_>>());
assert_eq!(history, log.iter().collect::<Vec<_>>());
assert_eq!(history_rev, log.iter().rev().collect::<Vec<_>>());
}
#[test]
fn extend() {
let mut log = CompressedLog::new();
let history =
vec![(Term(1), Index(1)), (Term(2), Index(2)), (Term(2), Index(3)), (Term(3), Index(4))];
let entries = history
.iter()
.copied()
.map(|(term, index)| Entry { term: term, index: index, payload: vec![] })
.collect::<Vec<_>>();
log.extend(&entries);
assert_eq!(history, log.iter().collect::<Vec<_>>());
log.extend(&entries[1..3]);
assert_eq!(history[..3], log.iter().collect::<Vec<_>>()[..]);
log.extend(&entries[2..]);
assert_eq!(history, log.iter().collect::<Vec<_>>());
let alt_history = vec![(Term(5), Index(1)), (Term(6), Index(2)), (Term(7), Index(3))];
let alt_entries = alt_history
.iter()
.copied()
.map(|(term, index)| Entry { term: term, index: index, payload: vec![] })
.collect::<Vec<_>>();
log.extend(&alt_entries[1..]);
let mut expected = vec![history[0]];
expected.extend(alt_history[1..].iter());
assert_eq!(expected, log.iter().collect::<Vec<_>>());
log.extend(&alt_entries);
assert_eq!(alt_history, log.iter().collect::<Vec<_>>());
log.trim(Index(0));
assert_eq!((Term(0), Index(0)), log.first());
assert_eq!((Term(0), Index(0)), log.last());
}
}