use std::collections::VecDeque;
use trackable::error::ErrorKindExt;
use cluster::ClusterConfig;
use log::{LogEntry, LogIndex, LogPosition, LogPrefix, LogSuffix};
use {ErrorKind, Result};
#[derive(Debug, Clone)]
pub struct LogHistory {
appended_tail: LogPosition,
committed_tail: LogPosition,
consumed_tail: LogPosition,
records: VecDeque<HistoryRecord>,
}
impl LogHistory {
pub fn new(config: ClusterConfig) -> Self {
let initial = HistoryRecord::new(LogPosition::default(), config);
LogHistory {
appended_tail: LogPosition::default(),
committed_tail: LogPosition::default(),
consumed_tail: LogPosition::default(),
records: vec![initial].into(),
}
}
pub fn head(&self) -> LogPosition {
self.records[0].head
}
pub fn tail(&self) -> LogPosition {
self.appended_tail
}
pub fn committed_tail(&self) -> LogPosition {
self.committed_tail
}
pub fn consumed_tail(&self) -> LogPosition {
self.consumed_tail
}
pub fn config(&self) -> &ClusterConfig {
&self.last_record().config
}
pub fn last_record(&self) -> &HistoryRecord {
self.records.back().expect("Never fails")
}
pub fn get_record(&self, index: LogIndex) -> Option<&HistoryRecord> {
for r in self.records.iter().rev() {
if r.head.index <= index {
return Some(&r);
}
}
None
}
pub fn record_appended(&mut self, suffix: &LogSuffix) -> Result<()> {
let entries_offset = if self.appended_tail.index <= suffix.head.index {
0
} else {
self.appended_tail.index - suffix.head.index
};
for (i, e) in suffix.entries.iter().enumerate().skip(entries_offset) {
let tail = LogPosition {
prev_term: e.term(),
index: suffix.head.index + i + 1,
};
if let LogEntry::Config { ref config, .. } = *e {
if self.last_record().config != *config {
let record = HistoryRecord::new(tail, config.clone());
self.records.push_back(record);
}
}
if tail.prev_term != self.last_record().head.prev_term {
track_assert!(
self.last_record().head.prev_term < tail.prev_term,
ErrorKind::Other,
"last_record.head={:?}, tail={:?}",
self.last_record().head,
tail
);
let record = HistoryRecord::new(tail, self.last_record().config.clone());
self.records.push_back(record);
}
}
self.appended_tail = suffix.tail();
Ok(())
}
pub fn record_committed(&mut self, new_tail_index: LogIndex) -> Result<()> {
track_assert!(
self.committed_tail.index <= new_tail_index,
ErrorKind::Other
);
track_assert!(
new_tail_index <= self.appended_tail.index,
ErrorKind::Other,
"new_tail_index={:?}, self.appended_tail.index={:?}",
new_tail_index,
self.appended_tail.index
);
let prev_term = track!(self
.get_record(new_tail_index,)
.ok_or_else(|| ErrorKind::Other.error(),))?
.head
.prev_term;
self.committed_tail = LogPosition {
prev_term,
index: new_tail_index,
};
Ok(())
}
pub fn record_consumed(&mut self, new_tail_index: LogIndex) -> Result<()> {
track_assert!(self.consumed_tail.index <= new_tail_index, ErrorKind::Other);
track_assert!(
new_tail_index <= self.committed_tail.index,
ErrorKind::Other
);
let prev_term =
track!(self.get_record(new_tail_index).ok_or_else(
|| ErrorKind::Other.cause(format!("Too old index: {:?}", new_tail_index))
))?
.head
.prev_term;
self.consumed_tail = LogPosition {
prev_term,
index: new_tail_index,
};
Ok(())
}
pub fn record_rollback(&mut self, new_tail: LogPosition) -> Result<()> {
track_assert!(new_tail.index <= self.appended_tail.index, ErrorKind::Other);
track_assert!(
self.committed_tail.index <= new_tail.index,
ErrorKind::Other,
"old={:?}, new={:?}",
self.committed_tail,
new_tail
);
track_assert_eq!(
self.get_record(new_tail.index).map(|r| r.head.prev_term),
Some(new_tail.prev_term),
ErrorKind::InconsistentState
);
self.appended_tail = new_tail;
if let Some(new_len) = self
.records
.iter()
.position(|r| r.head.index > new_tail.index)
{
self.records.truncate(new_len);
}
Ok(())
}
pub fn record_snapshot_installed(
&mut self,
new_head: LogPosition,
config: ClusterConfig,
) -> Result<()> {
track_assert!(
self.head().index <= new_head.index,
ErrorKind::InconsistentState,
"self.head={:?}, new_head={:?}",
self.head(),
new_head
);
while self
.records
.front()
.map_or(false, |r| r.head.index <= new_head.index)
{
self.records.pop_front();
}
let record = HistoryRecord::new(new_head, config);
self.records.push_front(record);
if self.appended_tail.index < new_head.index {
self.appended_tail = new_head;
}
if self.committed_tail.index < new_head.index {
self.committed_tail = new_head;
}
Ok(())
}
pub fn record_snapshot_loaded(&mut self, snapshot: &LogPrefix) -> Result<()> {
if self.consumed_tail.index < snapshot.tail.index {
track_assert!(
snapshot.tail.index <= self.committed_tail.index,
ErrorKind::InconsistentState,
"snapshot.tail.index={:?}, self.committed_tail.index={:?}",
snapshot.tail.index,
self.committed_tail.index
);
self.consumed_tail = snapshot.tail;
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct HistoryRecord {
pub head: LogPosition,
pub config: ClusterConfig,
}
impl HistoryRecord {
fn new(head: LogPosition, config: ClusterConfig) -> Self {
HistoryRecord { head, config }
}
}