//! A representation of not-yet-committed log entries and state.
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::eraftpb::{Entry, Snapshot};
/// The unstable.entries[i] has raft log position i+unstable.offset.
/// Note that unstable.offset may be less than the highest log
/// position in storage; this means that the next write to storage
/// might need to truncate the log before persisting unstable.entries.
#[derive(Debug, PartialEq, Default)]
pub struct Unstable {
/// The incoming unstable snapshot, if any.
pub snapshot: Option<Snapshot>,
/// All entries that have not yet been written to storage.
pub entries: Vec<Entry>,
/// The offset from the vector index.
pub offset: u64,
/// The tag to use when logging.
pub tag: String,
}
impl Unstable {
/// Creates a new log of unstable entries.
pub fn new(offset: u64, tag: String) -> Unstable {
Unstable {
offset,
snapshot: None,
entries: vec![],
tag,
}
}
/// Returns the index of the first possible entry in entries
/// if it has a snapshot.
pub fn maybe_first_index(&self) -> Option<u64> {
self.snapshot
.as_ref()
.map(|snap| snap.get_metadata().index + 1)
}
/// Returns the last index if it has at least one unstable entry or snapshot.
pub fn maybe_last_index(&self) -> Option<u64> {
match self.entries.len() {
0 => self.snapshot.as_ref().map(|snap| snap.get_metadata().index),
len => Some(self.offset + len as u64 - 1),
}
}
/// Returns the term of the entry at index idx, if there is any.
pub fn maybe_term(&self, idx: u64) -> Option<u64> {
if idx < self.offset {
let snapshot = self.snapshot.as_ref()?;
let meta = snapshot.get_metadata();
if idx == meta.index {
Some(meta.term)
} else {
None
}
} else {
self.maybe_last_index().and_then(|last| {
if idx > last {
return None;
}
Some(self.entries[(idx - self.offset) as usize].term)
})
}
}
/// Moves the stable offset up to the index. Provided that the index
/// is in the same election term.
pub fn stable_to(&mut self, idx: u64, term: u64) {
let t = self.maybe_term(idx);
if t.is_none() {
return;
}
if t.unwrap() == term && idx >= self.offset {
let start = idx + 1 - self.offset;
self.entries.drain(..start as usize);
self.offset = idx + 1;
}
}
/// Removes the snapshot from self if the index of the snapshot matches
pub fn stable_snap_to(&mut self, idx: u64) {
if self.snapshot.is_none() {
return;
}
if idx == self.snapshot.as_ref().unwrap().get_metadata().index {
self.snapshot = None;
}
}
/// From a given snapshot, restores the snapshot to self, but doesn't unpack.
pub fn restore(&mut self, snap: Snapshot) {
self.entries.clear();
self.offset = snap.get_metadata().index + 1;
self.snapshot = Some(snap);
}
/// Append entries to unstable, truncate local block first if overlapped.
pub fn truncate_and_append(&mut self, ents: &[Entry]) {
let after = ents[0].index;
if after == self.offset + self.entries.len() as u64 {
// after is the next index in the self.entries, append directly
self.entries.extend_from_slice(ents);
} else if after <= self.offset {
// The log is being truncated to before our current offset
// portion, so set the offset and replace the entries
self.offset = after;
self.entries.clear();
self.entries.extend_from_slice(ents);
} else {
// truncate to after and copy to self.entries then append
let off = self.offset;
self.must_check_outofbounds(off, after);
self.entries.truncate((after - off) as usize);
self.entries.extend_from_slice(ents);
}
}
/// Returns a slice of entries between the high and low.
///
/// # Panics
///
/// Panics if the `lo` or `hi` are out of bounds.
/// Panics if `lo > hi`.
pub fn slice(&self, lo: u64, hi: u64) -> &[Entry] {
self.must_check_outofbounds(lo, hi);
let l = lo as usize;
let h = hi as usize;
let off = self.offset as usize;
&self.entries[l - off..h - off]
}
/// Asserts the `hi` and `lo` values against each other and against the
/// entries themselves.
pub fn must_check_outofbounds(&self, lo: u64, hi: u64) {
if lo > hi {
panic!("{} invalid unstable.slice {} > {}", self.tag, lo, hi)
}
let upper = self.offset + self.entries.len() as u64;
if lo < self.offset || hi > upper {
panic!(
"{} unstable.slice[{}, {}] out of bound[{}, {}]",
self.tag, lo, hi, self.offset, upper
)
}
}
}
#[cfg(test)]
mod test {
use crate::default_logger;
use crate::eraftpb::{Entry, Snapshot, SnapshotMetadata};
use crate::log_unstable::Unstable;
fn new_entry(index: u64, term: u64) -> Entry {
let mut e = Entry::default();
e.term = term;
e.index = index;
e
}
fn new_snapshot(index: u64, term: u64) -> Snapshot {
let mut snap = Snapshot::default();
let mut meta = SnapshotMetadata::default();
meta.index = index;
meta.term = term;
snap.set_metadata(meta);
snap
}
#[test]
fn test_maybe_first_index() {
default_logger().new(o!("test" => "maybe_first_index"));
// entry, offset, snap, wok, windex,
let tests = vec![
// no snapshot
(Some(new_entry(5, 1)), 5, None, false, 0),
(None, 0, None, false, 0),
// has snapshot
(Some(new_entry(5, 1)), 5, Some(new_snapshot(4, 1)), true, 5),
(None, 5, Some(new_snapshot(4, 1)), true, 5),
];
for (entries, offset, snapshot, wok, windex) in tests {
let u = Unstable {
entries: entries.map_or(vec![], |entry| vec![entry]),
offset,
snapshot,
..Default::default()
};
let index = u.maybe_first_index();
match index {
None => assert!(!wok),
Some(index) => assert_eq!(index, windex),
}
}
}
#[test]
fn test_maybe_last_index() {
default_logger().new(o!("test" => "maybe_last_index"));
// entry, offset, snap, wok, windex,
let tests = vec![
(Some(new_entry(5, 1)), 5, None, true, 5),
(Some(new_entry(5, 1)), 5, Some(new_snapshot(4, 1)), true, 5),
// last in snapshot
(None, 5, Some(new_snapshot(4, 1)), true, 4),
// empty unstable
(None, 0, None, false, 0),
];
for (entries, offset, snapshot, wok, windex) in tests {
let u = Unstable {
entries: entries.map_or(vec![], |entry| vec![entry]),
offset,
snapshot,
..Default::default()
};
let index = u.maybe_last_index();
match index {
None => assert!(!wok),
Some(index) => assert_eq!(index, windex),
}
}
}
#[test]
fn test_maybe_term() {
default_logger().new(o!("test" => "maybe_term"));
// entry, offset, snap, index, wok, wterm
let tests = vec![
// term from entries
(Some(new_entry(5, 1)), 5, None, 5, true, 1),
(Some(new_entry(5, 1)), 5, None, 6, false, 0),
(Some(new_entry(5, 1)), 5, None, 4, false, 0),
(
Some(new_entry(5, 1)),
5,
Some(new_snapshot(4, 1)),
5,
true,
1,
),
(
Some(new_entry(5, 1)),
5,
Some(new_snapshot(4, 1)),
6,
false,
0,
),
// term from snapshot
(
Some(new_entry(5, 1)),
5,
Some(new_snapshot(4, 1)),
4,
true,
1,
),
(
Some(new_entry(5, 1)),
5,
Some(new_snapshot(4, 1)),
3,
false,
0,
),
(None, 5, Some(new_snapshot(4, 1)), 5, false, 0),
(None, 5, Some(new_snapshot(4, 1)), 4, true, 1),
(None, 0, None, 5, false, 0),
];
for (entries, offset, snapshot, index, wok, wterm) in tests {
let u = Unstable {
entries: entries.map_or(vec![], |entry| vec![entry]),
offset,
snapshot,
..Default::default()
};
let term = u.maybe_term(index);
match term {
None => assert!(!wok),
Some(term) => assert_eq!(term, wterm),
}
}
}
#[test]
fn test_restore() {
default_logger().new(o!("test" => "restore"));
let mut u = Unstable {
entries: vec![new_entry(5, 1)],
offset: 5,
snapshot: Some(new_snapshot(4, 1)),
..Default::default()
};
let s = new_snapshot(6, 2);
u.restore(s.clone());
assert_eq!(u.offset, s.get_metadata().index + 1);
assert!(u.entries.is_empty());
assert_eq!(u.snapshot.unwrap(), s);
}
#[test]
fn test_stable_to() {
default_logger().new(o!("test" => "stable_to"));
// entries, offset, snap, index, term, woffset, wlen
let tests = vec![
(vec![], 0, None, 5, 1, 0, 0),
// stable to the first entry
(vec![new_entry(5, 1)], 5, None, 5, 1, 6, 0),
(vec![new_entry(5, 1), new_entry(6, 0)], 5, None, 5, 1, 6, 1),
// stable to the first entry and term mismatch
(vec![new_entry(6, 2)], 5, None, 6, 1, 5, 1),
// stable to old entry
(vec![new_entry(5, 1)], 5, None, 4, 1, 5, 1),
(vec![new_entry(5, 1)], 5, None, 4, 2, 5, 1),
// with snapshot
// stable to the first entry
(
vec![new_entry(5, 1)],
5,
Some(new_snapshot(4, 1)),
5,
1,
6,
0,
),
// stable to the first entry
(
vec![new_entry(5, 1), new_entry(6, 1)],
5,
Some(new_snapshot(4, 1)),
5,
1,
6,
1,
),
// stable to the first entry and term mismatch
(
vec![new_entry(6, 2)],
5,
Some(new_snapshot(5, 1)),
6,
1,
5,
1,
),
// stable to snapshot
(
vec![new_entry(5, 1)],
5,
Some(new_snapshot(4, 1)),
4,
1,
5,
1,
),
// stable to old entry
(
vec![new_entry(5, 2)],
5,
Some(new_snapshot(4, 2)),
4,
1,
5,
1,
),
];
for (entries, offset, snapshot, index, term, woffset, wlen) in tests {
let mut u = Unstable {
entries,
offset,
snapshot,
..Default::default()
};
u.stable_to(index, term);
assert_eq!(u.offset, woffset);
assert_eq!(u.entries.len(), wlen);
}
}
#[test]
fn test_truncate_and_append() {
default_logger().new(o!("test" => "truncate_and_append"));
// entries, offset, snap, to_append, woffset, wentries
let tests = vec![
// replace to the end
(
vec![new_entry(5, 1)],
5,
None,
vec![new_entry(6, 1), new_entry(7, 1)],
5,
vec![new_entry(5, 1), new_entry(6, 1), new_entry(7, 1)],
),
// replace to unstable entries
(
vec![new_entry(5, 1)],
5,
None,
vec![new_entry(5, 2), new_entry(6, 2)],
5,
vec![new_entry(5, 2), new_entry(6, 2)],
),
(
vec![new_entry(5, 1)],
5,
None,
vec![new_entry(4, 2), new_entry(5, 2), new_entry(6, 2)],
4,
vec![new_entry(4, 2), new_entry(5, 2), new_entry(6, 2)],
),
// truncate existing entries and append
(
vec![new_entry(5, 1), new_entry(6, 1), new_entry(7, 1)],
5,
None,
vec![new_entry(6, 2)],
5,
vec![new_entry(5, 1), new_entry(6, 2)],
),
(
vec![new_entry(5, 1), new_entry(6, 1), new_entry(7, 1)],
5,
None,
vec![new_entry(7, 2), new_entry(8, 2)],
5,
vec![
new_entry(5, 1),
new_entry(6, 1),
new_entry(7, 2),
new_entry(8, 2),
],
),
];
for (entries, offset, snapshot, to_append, woffset, wentries) in tests {
let mut u = Unstable {
entries,
offset,
snapshot,
..Default::default()
};
u.truncate_and_append(&to_append);
assert_eq!(u.offset, woffset);
assert_eq!(u.entries, wentries);
}
}
}