use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use crate::eraftpb::*;
use crate::errors::{Error, Result, StorageError};
use crate::util::limit_size;
#[derive(Debug, Clone, Default, Getters, Setters)]
pub struct RaftState {
pub hard_state: HardState,
pub conf_state: ConfState,
#[get = "pub"]
#[set]
pending_conf_state: Option<ConfState>,
#[get = "pub"]
#[set]
pending_conf_state_start_index: Option<u64>,
}
impl RaftState {
pub fn new(hard_state: HardState, conf_state: ConfState) -> RaftState {
RaftState {
hard_state,
conf_state,
pending_conf_state: None,
pending_conf_state_start_index: None,
}
}
pub fn initialized(&self) -> bool {
self.conf_state != ConfState::default()
}
}
pub trait Storage {
fn initial_state(&self) -> Result<RaftState>;
fn entries(&self, low: u64, high: u64, max_size: impl Into<Option<u64>>) -> Result<Vec<Entry>>;
fn term(&self, idx: u64) -> Result<u64>;
fn first_index(&self) -> Result<u64>;
fn last_index(&self) -> Result<u64>;
fn snapshot(&self, request_index: u64) -> Result<Snapshot>;
}
pub struct MemStorageCore {
raft_state: RaftState,
entries: Vec<Entry>,
snapshot_metadata: SnapshotMetadata,
trigger_snap_unavailable: bool,
}
impl Default for MemStorageCore {
fn default() -> MemStorageCore {
MemStorageCore {
raft_state: Default::default(),
entries: vec![],
snapshot_metadata: Default::default(),
trigger_snap_unavailable: false,
}
}
}
impl MemStorageCore {
pub fn set_hardstate(&mut self, hs: HardState) {
self.raft_state.hard_state = hs;
}
pub fn hard_state(&self) -> &HardState {
&self.raft_state.hard_state
}
pub fn mut_hard_state(&mut self) -> &mut HardState {
&mut self.raft_state.hard_state
}
pub fn commit_to(&mut self, index: u64) -> Result<()> {
assert!(
self.has_entry_at(index),
"commit_to {} but the entry not exists",
index
);
let diff = (index - self.entries[0].index) as usize;
self.raft_state.hard_state.commit = index;
self.raft_state.hard_state.term = self.entries[diff].term;
Ok(())
}
pub fn set_conf_state(
&mut self,
cs: ConfState,
pending_membership_change: Option<(ConfState, u64)>,
) {
self.raft_state.conf_state = cs;
if let Some((cs, idx)) = pending_membership_change {
self.raft_state.pending_conf_state = Some(cs);
self.raft_state.pending_conf_state_start_index = Some(idx);
}
}
#[inline]
fn has_entry_at(&self, index: u64) -> bool {
!self.entries.is_empty() && index >= self.first_index() && index <= self.last_index()
}
fn first_index(&self) -> u64 {
match self.entries.first() {
Some(e) => e.index,
None => self.snapshot_metadata.index + 1,
}
}
fn last_index(&self) -> u64 {
match self.entries.last() {
Some(e) => e.index,
None => self.snapshot_metadata.index,
}
}
pub fn apply_snapshot(&mut self, mut snapshot: Snapshot) -> Result<()> {
let mut meta = snapshot.take_metadata();
let term = meta.term;
let index = meta.index;
if self.first_index() > index {
return Err(Error::Store(StorageError::SnapshotOutOfDate));
}
self.snapshot_metadata = meta.clone();
self.raft_state.hard_state.term = term;
self.raft_state.hard_state.commit = index;
self.entries.clear();
self.raft_state.conf_state = meta.take_conf_state();
if meta.pending_membership_change_index > 0 {
let cs = meta.take_pending_membership_change();
let i = meta.pending_membership_change_index;
self.raft_state.pending_conf_state = Some(cs);
self.raft_state.pending_conf_state_start_index = Some(i);
}
Ok(())
}
fn snapshot(&self) -> Snapshot {
let mut snapshot = Snapshot::default();
let applied_idx = self.raft_state.hard_state.commit;
let term = self.raft_state.hard_state.term;
let meta = snapshot.mut_metadata();
meta.index = applied_idx;
meta.term = term;
meta.set_conf_state(self.raft_state.conf_state.clone());
if let Some(ref cs) = self.raft_state.pending_conf_state {
let i = self.raft_state.pending_conf_state_start_index.unwrap();
meta.set_pending_membership_change(cs.clone());
meta.pending_membership_change_index = i;
}
snapshot
}
pub fn compact(&mut self, compact_index: u64) -> Result<()> {
if compact_index <= self.first_index() {
return Ok(());
}
if compact_index > self.last_index() + 1 {
panic!(
"compact not received raft logs: {}, last index: {}",
compact_index,
self.last_index()
);
}
if let Some(entry) = self.entries.first() {
let offset = compact_index - entry.index;
self.entries.drain(..offset as usize);
}
Ok(())
}
pub fn append(&mut self, ents: &[Entry]) -> Result<()> {
if ents.is_empty() {
return Ok(());
}
if self.first_index() > ents[0].index {
panic!(
"overwrite compacted raft logs, compacted: {}, append: {}",
self.first_index() - 1,
ents[0].index,
);
}
if self.last_index() + 1 < ents[0].index {
panic!(
"raft logs should be continuous, last index: {}, new appended: {}",
self.last_index(),
ents[0].index,
);
}
let diff = ents[0].index - self.first_index();
self.entries.drain(diff as usize..);
self.entries.extend_from_slice(&ents);
Ok(())
}
pub fn commit_to_and_set_conf_states(
&mut self,
idx: u64,
cs: Option<ConfState>,
pending_membership_change: Option<ConfChange>,
) -> Result<()> {
self.commit_to(idx)?;
if let Some(cs) = cs {
self.raft_state.conf_state = cs;
}
if let Some(mut pending_change) = pending_membership_change {
let conf_state = pending_change.take_configuration();
self.raft_state.pending_conf_state = Some(conf_state);
let index = pending_change.start_index;
self.raft_state.pending_conf_state_start_index = Some(index);
}
Ok(())
}
pub fn trigger_snap_unavailable(&mut self) {
self.trigger_snap_unavailable = true;
}
}
#[derive(Clone, Default)]
pub struct MemStorage {
core: Arc<RwLock<MemStorageCore>>,
}
impl MemStorage {
pub fn new() -> MemStorage {
MemStorage {
..Default::default()
}
}
pub fn new_with_conf_state<T>(conf_state: T) -> MemStorage
where
ConfState: From<T>,
{
let store = MemStorage::new();
store.initialize_with_conf_state(conf_state);
store
}
pub fn initialize_with_conf_state<T>(&self, conf_state: T)
where
ConfState: From<T>,
{
assert!(!self.initial_state().unwrap().initialized());
let mut core = self.wl();
core.snapshot_metadata.index = 1;
core.snapshot_metadata.term = 1;
core.raft_state.hard_state.commit = 1;
core.raft_state.hard_state.term = 1;
core.raft_state.conf_state = ConfState::from(conf_state);
}
pub fn rl(&self) -> RwLockReadGuard<'_, MemStorageCore> {
self.core.read().unwrap()
}
pub fn wl(&self) -> RwLockWriteGuard<'_, MemStorageCore> {
self.core.write().unwrap()
}
}
impl Storage for MemStorage {
fn initial_state(&self) -> Result<RaftState> {
Ok(self.rl().raft_state.clone())
}
fn entries(&self, low: u64, high: u64, max_size: impl Into<Option<u64>>) -> Result<Vec<Entry>> {
let max_size = max_size.into();
let core = self.rl();
if low < core.first_index() {
return Err(Error::Store(StorageError::Compacted));
}
if high > core.last_index() + 1 {
panic!(
"index out of bound (last: {}, high: {})",
core.last_index() + 1,
high
);
}
let offset = core.entries[0].index;
let lo = (low - offset) as usize;
let hi = (high - offset) as usize;
let mut ents = core.entries[lo..hi].to_vec();
limit_size(&mut ents, max_size);
Ok(ents)
}
fn term(&self, idx: u64) -> Result<u64> {
let core = self.rl();
if idx == core.snapshot_metadata.index {
return Ok(core.snapshot_metadata.term);
}
if idx < core.first_index() {
return Err(Error::Store(StorageError::Compacted));
}
let offset = core.entries[0].index;
assert!(idx >= offset);
if idx - offset >= core.entries.len() as u64 {
return Err(Error::Store(StorageError::Unavailable));
}
Ok(core.entries[(idx - offset) as usize].term)
}
fn first_index(&self) -> Result<u64> {
Ok(self.rl().first_index())
}
fn last_index(&self) -> Result<u64> {
Ok(self.rl().last_index())
}
fn snapshot(&self, request_index: u64) -> Result<Snapshot> {
let mut core = self.wl();
if core.trigger_snap_unavailable {
core.trigger_snap_unavailable = false;
Err(Error::Store(StorageError::SnapshotTemporarilyUnavailable))
} else {
let mut snap = core.snapshot();
if snap.get_metadata().index < request_index {
snap.mut_metadata().index = request_index;
}
Ok(snap)
}
}
}
#[cfg(test)]
mod test {
use std::panic::{self, AssertUnwindSafe};
use protobuf::Message as PbMessage;
use crate::default_logger;
use crate::eraftpb::{ConfState, Entry, Snapshot};
use crate::errors::{Error as RaftError, StorageError};
use super::{MemStorage, Storage};
fn new_entry(index: u64, term: u64) -> Entry {
let mut e = Entry::default();
e.term = term;
e.index = index;
e
}
fn size_of<T: PbMessage>(m: &T) -> u32 {
m.compute_size() as u32
}
fn new_snapshot(index: u64, term: u64, nodes: Vec<u64>) -> Snapshot {
let mut s = Snapshot::default();
s.mut_metadata().index = index;
s.mut_metadata().term = term;
s.mut_metadata().mut_conf_state().nodes = nodes;
s
}
#[test]
fn test_storage_term() {
default_logger().new(o!("test" => "storage_term"));
let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
let mut tests = vec![
(2, Err(RaftError::Store(StorageError::Compacted))),
(3, Ok(3)),
(4, Ok(4)),
(5, Ok(5)),
(6, Err(RaftError::Store(StorageError::Unavailable))),
];
for (i, (idx, wterm)) in tests.drain(..).enumerate() {
let storage = MemStorage::new();
storage.wl().entries = ents.clone();
let t = storage.term(idx);
if t != wterm {
panic!("#{}: expect res {:?}, got {:?}", i, wterm, t);
}
}
}
#[test]
fn test_storage_entries() {
default_logger().new(o!("test" => "storage_entries"));
let ents = vec![
new_entry(3, 3),
new_entry(4, 4),
new_entry(5, 5),
new_entry(6, 6),
];
let max_u64 = u64::max_value();
let mut tests = vec![
(
2,
6,
max_u64,
Err(RaftError::Store(StorageError::Compacted)),
),
(3, 4, max_u64, Ok(vec![new_entry(3, 3)])),
(4, 5, max_u64, Ok(vec![new_entry(4, 4)])),
(4, 6, max_u64, Ok(vec![new_entry(4, 4), new_entry(5, 5)])),
(
4,
7,
max_u64,
Ok(vec![new_entry(4, 4), new_entry(5, 5), new_entry(6, 6)]),
),
(4, 7, 0, Ok(vec![new_entry(4, 4)])),
(
4,
7,
u64::from(size_of(&ents[1]) + size_of(&ents[2])),
Ok(vec![new_entry(4, 4), new_entry(5, 5)]),
),
(
4,
7,
u64::from(size_of(&ents[1]) + size_of(&ents[2]) + size_of(&ents[3]) / 2),
Ok(vec![new_entry(4, 4), new_entry(5, 5)]),
),
(
4,
7,
u64::from(size_of(&ents[1]) + size_of(&ents[2]) + size_of(&ents[3]) - 1),
Ok(vec![new_entry(4, 4), new_entry(5, 5)]),
),
(
4,
7,
u64::from(size_of(&ents[1]) + size_of(&ents[2]) + size_of(&ents[3])),
Ok(vec![new_entry(4, 4), new_entry(5, 5), new_entry(6, 6)]),
),
];
for (i, (lo, hi, maxsize, wentries)) in tests.drain(..).enumerate() {
let storage = MemStorage::new();
storage.wl().entries = ents.clone();
let e = storage.entries(lo, hi, maxsize);
if e != wentries {
panic!("#{}: expect entries {:?}, got {:?}", i, wentries, e);
}
}
}
#[test]
fn test_storage_last_index() {
default_logger().new(o!("test" => "storage_last_index"));
let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
let storage = MemStorage::new();
storage.wl().entries = ents;
let wresult = Ok(5);
let result = storage.last_index();
if result != wresult {
panic!("want {:?}, got {:?}", wresult, result);
}
storage.wl().append(&[new_entry(6, 5)]).unwrap();
let wresult = Ok(6);
let result = storage.last_index();
if result != wresult {
panic!("want {:?}, got {:?}", wresult, result);
}
}
#[test]
fn test_storage_first_index() {
default_logger().new(o!("test" => "storage_first_index"));
let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
let storage = MemStorage::new();
storage.wl().entries = ents;
assert_eq!(storage.first_index(), Ok(3));
storage.wl().compact(4).unwrap();
assert_eq!(storage.first_index(), Ok(4));
}
#[test]
fn test_storage_compact() {
default_logger().new(o!("test" => "storage_compact"));
let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
let mut tests = vec![(2, 3, 3, 3), (3, 3, 3, 3), (4, 4, 4, 2), (5, 5, 5, 1)];
for (i, (idx, windex, wterm, wlen)) in tests.drain(..).enumerate() {
let storage = MemStorage::new();
storage.wl().entries = ents.clone();
storage.wl().compact(idx).unwrap();
let index = storage.first_index().unwrap();
if index != windex {
panic!("#{}: want {}, index {}", i, windex, index);
}
let term = if let Ok(v) = storage.entries(index, index + 1, 1) {
v.first().map_or(0, |e| e.term)
} else {
0
};
if term != wterm {
panic!("#{}: want {}, term {}", i, wterm, term);
}
let last = storage.last_index().unwrap();
let len = storage.entries(index, last + 1, 100).unwrap().len();
if len != wlen {
panic!("#{}: want {}, term {}", i, wlen, len);
}
}
}
#[test]
fn test_storage_create_snapshot() {
default_logger().new(o!("test" => "storage_create_snapshot"));
let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
let nodes = vec![1, 2, 3];
let mut conf_state = ConfState::default();
conf_state.nodes = nodes.clone();
let unavailable = Err(RaftError::Store(
StorageError::SnapshotTemporarilyUnavailable,
));
let mut tests = vec![
(4, Ok(new_snapshot(4, 4, nodes.clone())), 0),
(5, Ok(new_snapshot(5, 5, nodes.clone())), 5),
(5, Ok(new_snapshot(6, 5, nodes.clone())), 6),
(5, unavailable, 6),
];
for (i, (idx, wresult, windex)) in tests.drain(..).enumerate() {
let storage = MemStorage::new();
storage.wl().entries = ents.clone();
storage.wl().raft_state.hard_state.commit = idx;
storage.wl().raft_state.hard_state.term = idx;
storage.wl().raft_state.conf_state = conf_state.clone();
if wresult.is_err() {
storage.wl().trigger_snap_unavailable();
}
let result = storage.snapshot(windex);
if result != wresult {
panic!("#{}: want {:?}, got {:?}", i, wresult, result);
}
}
}
#[test]
fn test_storage_append() {
default_logger().new(o!("test" => "storage_append"));
let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
let mut tests = vec![
(
vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)],
Some(vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)]),
),
(
vec![new_entry(3, 3), new_entry(4, 6), new_entry(5, 6)],
Some(vec![new_entry(3, 3), new_entry(4, 6), new_entry(5, 6)]),
),
(
vec![
new_entry(3, 3),
new_entry(4, 4),
new_entry(5, 5),
new_entry(6, 5),
],
Some(vec![
new_entry(3, 3),
new_entry(4, 4),
new_entry(5, 5),
new_entry(6, 5),
]),
),
(
vec![new_entry(2, 3), new_entry(3, 3), new_entry(4, 5)],
None,
),
(
vec![new_entry(4, 5)],
Some(vec![new_entry(3, 3), new_entry(4, 5)]),
),
(
vec![new_entry(6, 6)],
Some(vec![
new_entry(3, 3),
new_entry(4, 4),
new_entry(5, 5),
new_entry(6, 6),
]),
),
];
for (i, (entries, wentries)) in tests.drain(..).enumerate() {
let storage = MemStorage::new();
storage.wl().entries = ents.clone();
let res = panic::catch_unwind(AssertUnwindSafe(|| storage.wl().append(&entries)));
if let Some(wentries) = wentries {
assert!(res.is_ok());
let e = &storage.wl().entries;
if *e != wentries {
panic!("#{}: want {:?}, entries {:?}", i, wentries, e);
}
} else {
assert!(res.is_err());
}
}
}
#[test]
fn test_storage_apply_snapshot() {
default_logger().new(o!("test" => "storage_apply_snapshot"));
let nodes = vec![1, 2, 3];
let storage = MemStorage::new();
let snap = new_snapshot(4, 4, nodes.clone());
assert!(storage.wl().apply_snapshot(snap).is_ok());
let snap = new_snapshot(3, 3, nodes.clone());
assert!(storage.wl().apply_snapshot(snap).is_err());
}
}