use std::cmp::{max, min};
use std::fmt::{Debug, Write};
use std::slice::Iter;
use log::{debug, error};
use serde::de::DeserializeOwned;
use dcs::communication::messages::{UpdateClusterVec, COORDINATION_MESSAGE_OPCODE, Header, IdentificableMessage, Package, PackageBuilder};
use dcs::heapless;
use dcs::nodes::SystemNodeId;
use dcs::properties::MEASUREMENTS_MAX_COUNT;
use dcs::rules::measurements::{Measurement, SystemState};
use dcs::rules::strategy::Rule;
use crate::server::{LogData, Merge, NoOp};
use super::*;
pub type Term = u16;
pub type LogIndex = u32;
pub const LOG_LEN: usize = CLUSTER_NODE_COUNT * MEASUREMENTS_MAX_COUNT / 3;
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct Log<T: LogData> {
vec: heapless::Vec<LogEntry<T>, LOG_LEN>,
index_shift: u32,
}
impl<T: LogData> Default for Log<T> {
fn default() -> Self {
Self {
vec: Default::default(),
index_shift: 1,
}
}
}
impl<T: LogData> Log<T> {
pub fn new() -> Log<T> {
Self {
vec: Default::default(),
index_shift: 1,
}
}
pub fn noop() -> Log<T> {
Self::new()
}
pub fn push(&mut self, data: LogEntry<T>) -> Result<(), LogEntry<T>> {
self.vec.push(data)
}
pub fn insert(&mut self, idx: LogIndex, data: LogEntry<T>) {
assert!(idx > 0);
let idx: usize = self.shift_idx(idx);
if idx < self.len() {
self.vec[idx] = data
} else {
if self.vec.push(data).is_err() {
error!("Couldn't append element to log: max capacity of {} reached.", self.vec.len())
};
}
}
pub fn is_snapshot(&self, idx: LogIndex) -> bool {
idx <= self.index_shift
}
pub fn last_included_index(&self) -> u32 {
self.index_shift
}
pub fn last_included_term(&self) -> Term {
if self.index_shift > 1 {
self.vec.first().unwrap().term
} else {
0
}
}
pub fn snapshot(&mut self, idx: LogIndex) {
assert!(idx > 0);
if idx < 2 {
return;
}
let snapshot_count = min(self.shift_idx(idx) as u32 + 1, self.len() as u32);
let mut data_accum = None::<T>;
let mut term_accum = Term::default();
let mut rule_accum = None;
let mut config_change_accum = None;
for _ in 0..snapshot_count {
let entry = self.vec.remove(0);
data_accum = match (data_accum, entry.data) {
(Some(acc), Some(d)) => Some(acc.merge(d)),
(acc, d) => acc.or(d),
};
term_accum = max(entry.term, term_accum);
config_change_accum = entry.config_change.or(config_change_accum);
rule_accum = entry.rule.or(rule_accum);
}
self.vec.insert(0, LogEntry::new(term_accum, data_accum, config_change_accum, rule_accum));
self.index_shift += snapshot_count.checked_sub(1).unwrap_or_default();
}
pub fn install_snapshot(&mut self, last_included_index: LogIndex, entry: LogEntry<T>) {
let snapshot_count = min(
self.shift_idx(last_included_index) as u32 + 1,
self.len() as u32,
);
for _ in 0..snapshot_count {
self.vec.remove(0);
}
self.index_shift = last_included_index;
self.vec.insert(0, entry);
}
pub fn capacity(&self) -> f32 {
1.0 - (self.len() as f32 / LOG_LEN as f32)
}
fn shift_idx(&self, idx: LogIndex) -> usize {
idx.checked_sub(self.index_shift).unwrap_or_default() as usize
}
pub fn pop(&mut self) -> Option<LogEntry<T>> {
self.vec.pop()
}
pub fn get(&self, idx: LogIndex) -> Option<&LogEntry<T>> {
assert!(idx > 0);
self.vec.get(self.shift_idx(idx))
}
pub fn len(&self) -> usize {
self.vec.len()
}
pub fn last_index(&self) -> usize {
(self.vec.len() + self.index_shift as usize).checked_sub(1).unwrap_or_default()
}
pub fn is_empty(&self) -> bool {
self.vec.is_empty()
}
pub fn last(&self) -> Option<&LogEntry<T>> {
self.vec.last()
}
pub fn iter(&self) -> Iter<'_, LogEntry<T>> {
self.vec.iter()
}
}
impl<T: LogData, I: Iterator<Item = LogEntry<T>>> From<I> for Log<T> {
fn from(value: I) -> Self {
let mut log = Self::new();
for i in value {
log.push(i);
}
log
}
}
impl<T: LogData> IntoIterator for Log<T> {
type Item = LogEntry<T>;
type IntoIter = IntoIter<T>;
fn into_iter(self) -> Self::IntoIter {
IntoIter {
idx: 0,
vec: self.vec,
}
}
}
pub struct IntoIter<T: LogData> {
idx: usize,
vec: heapless::Vec<LogEntry<T>, LOG_LEN>,
}
impl<T: LogData> Iterator for IntoIter<T> {
type Item = LogEntry<T>;
fn next(&mut self) -> Option<Self::Item> {
let current = self.vec.get(self.idx);
self.idx += 1;
current.cloned()
}
}
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct LogEntry<T: LogData> {
pub term: Term,
pub data: Option<T>,
pub config_change: Option<UpdateClusterVec>,
pub rule: Option<Rule>,
}
impl<T: LogData> LogEntry<T> {
pub fn new(term: Term, data: Option<T>, config_change: Option<UpdateClusterVec>, rule: Option<Rule>, ) -> Self {
Self {
term,
data,
config_change,
rule,
}
}
pub fn with_data(term: Term, data: Option<T>) -> Self {
Self {
term,
data,
config_change: None,
rule: None,
}
}
pub fn with_rule(term: Term, rule: Rule) -> Self {
Self {
term,
data: None,
config_change: None,
rule: Some(rule),
}
}
pub fn with_config(term: Term, config_change: Option<UpdateClusterVec>) -> Self {
Self {
term,
data: None,
config_change,
rule: None,
}
}
}
#[derive(Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)]
pub struct RequestVoteArgs {
pub term: Term,
pub prev_log_index: LogIndex,
pub prev_log_term: Term,
}
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct AppendLogArgs<T: LogData> {
pub term: Term,
pub prev_log_index: LogIndex,
pub prev_log_term: Term,
pub entries: Log<T>,
pub leader_commit: LogIndex,
}
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub enum RaftMessage<T>
where
T: LogData,
{
RequestVote(RequestVoteArgs),
RequestVoteResponse(RequestVoteResponseResult),
AppendLog(AppendLogArgs<T>),
AppendLogResponse(AppendLogResponseResult),
ReadRequest,
ReadRequestReply(ReadRequestReplyArgs<T>),
WriteRequest(WriteRequestArgs),
WriteRequestReply(bool, Option<SystemNodeId>),
ConfigChange(UpdateClusterVec),
ConfigChangeACK(bool),
InstallSnapshot(InstallSnapshotArgs<T>),
InstallSnapshotResponse(Term),
}
impl<T: LogData> Display for RaftMessage<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
RaftMessage::RequestVote(args) => {
f.write_fmt(format_args!("RequestVote term: {}", args.term))
}
RaftMessage::RequestVoteResponse(args) => {
let granted = if args.granted {"granted"} else {"rejected"};
f.write_fmt(format_args!("RequestVoteResponse {}", granted))
}
RaftMessage::AppendLog(args) => {
f.write_fmt(format_args!("AppendLog term: {} index: {}", args.term, args.prev_log_index + 1))
}
RaftMessage::AppendLogResponse(args) => {
let granted = if args.success {"granted"} else {"rejected"};
f.write_fmt(format_args!("AppendLogResponse {}", granted))
}
RaftMessage::ReadRequest => {
f.write_fmt(format_args!("ReadRequest"))
}
RaftMessage::ReadRequestReply(args) => {
let granted = if args.success {"granted"} else {"rejected"};
f.write_fmt(format_args!("ReadRequestReply {}", granted))
}
RaftMessage::WriteRequest(args) => {
let value = args.measurement.map(|m| m.value.to_string()).unwrap_or_default();
let measurement_type = args.measurement.map(|m| m.typed.to_string()).unwrap_or_default();
f.write_fmt(format_args!("WriteRequest value: {} type: {:?}", value, measurement_type))
}
RaftMessage::WriteRequestReply(success, _) => {
let granted = if *success {"granted"} else {"rejected"};
f.write_fmt(format_args!("WriteRequestReply {}", granted))
}
RaftMessage::ConfigChange(args) => {
let prefix = f.write_str("ConfigChange new cluster: ");
let cluster_ids = args.iter().map(|id| f.write_fmt(format_args!("{id} "))).collect();
prefix.and(cluster_ids)
}
RaftMessage::ConfigChangeACK(success) => {
let granted = if *success {"granted"} else {"rejected"};
f.write_fmt(format_args!("ConfigChangeACK {}", granted))
}
RaftMessage::InstallSnapshot(args) => {
f.write_fmt(format_args!("InstallSnapshot"))
}
RaftMessage::InstallSnapshotResponse(args) => {
f.write_fmt(format_args!("ConfigChangeACK"))
}
}
}
}
impl<T: Clone + Debug + Default + Serialize + DeserializeOwned + LogData> Default
for RaftMessage<T>
{
fn default() -> Self {
RaftMessage::ReadRequest
}
}
impl<T: LogData + Clone + Debug + Default + Serialize + DeserializeOwned> IdentificableMessage
for RaftMessage<T>
{
fn id() -> u8 {
COORDINATION_MESSAGE_OPCODE
}
}
pub struct RaftPackageBuilder<T: LogData> {
to: Option<SystemNodeId>,
from: Option<SystemNodeId>,
body: Option<RaftMessage<T>>,
}
impl<T: LogData> Default for RaftPackageBuilder<T> {
fn default() -> Self {
Self {
to: None,
from: None,
body: None,
}
}
}
impl<T: LogData> PackageBuilder for RaftPackageBuilder<T> {
type NodeId = SystemNodeId;
type Message = RaftMessage<T>;
fn clean_copy(&self) -> Self {
Self::default()
}
fn to(mut self, id: SystemNodeId) -> Self {
self.to = Some(id);
self
}
fn from(mut self, id: SystemNodeId) -> Self {
self.from = Some(id);
self
}
fn with_message(mut self, msg: RaftMessage<T>) -> Self {
self.body = Some(msg);
self
}
fn respond_to(self, package: Package<Self::NodeId, Self::Message>) -> Self {
self.to(package.header.from)
}
fn build(self) -> Result<Package<Self::NodeId, Self::Message>, ()> {
Ok(Package {
header: Header {
from: self.from.ok_or(())?,
to: self.to.ok_or(())?,
},
body: self.body.ok_or(())?,
})
}
}
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct InstallSnapshotArgs<T: LogData> {
pub term: Term,
pub leader_id: SystemNodeId,
pub last_included_index: LogIndex,
pub last_included_term: Term,
pub data: LogEntry<T>,
}
#[derive(Eq, PartialEq, Debug, Copy, Clone, Serialize, Deserialize)]
pub struct ReadRequestReplyArgs<T> {
pub success: bool,
pub data: Option<T>,
pub redirect: Option<SystemNodeId>,
}
impl<T> ReadRequestReplyArgs<T> {
pub fn fail(leader_id: Option<SystemNodeId>) -> Self {
Self {
success: false,
data: None,
redirect: leader_id,
}
}
}
#[derive(Eq, PartialEq, Debug, Copy, Clone, Serialize, Deserialize)]
pub struct RequestVoteResponseResult {
pub term: Term,
pub granted: bool,
}
#[derive(Eq, PartialEq, Debug, Copy, Clone, Serialize, Deserialize)]
pub struct AppendLogResponseResult {
pub term: Term,
pub success: bool,
}
#[derive(Eq, PartialEq, Debug, Copy, Clone, Serialize, Deserialize)]
pub struct WriteRequestArgs {
pub measurement: Option<Measurement>,
pub rule: Option<Rule>,
pub id: SystemNodeId,
}
impl WriteRequestArgs {
pub fn with_rule(id: SystemNodeId, rule: Rule) -> Self {
Self {
measurement: None,
rule: Some(rule),
id
}
}
pub fn with_measurement(id: SystemNodeId, measurement: Measurement) -> Self {
Self {
measurement: Some(measurement),
rule: None,
id
}
}
}
#[cfg(test)]
mod log_tests {
use crate::messages::{UpdateClusterVec, Log, LogEntry, Term, LOG_LEN};
use crate::server::{Merge, NoOp};
use dcs::nodes::SystemNodeId;
use dcs::rules::measurements::{Measurement, SystemState};
use serde::{Serialize, Serializer};
use std::fmt::Debug;
use dcs::rules::strategy::{Rule, RuleType};
#[derive(Eq, PartialEq, Copy, Clone, Debug, Serialize)]
struct TestLogData(usize);
impl NoOp for TestLogData {
fn noop() -> Self {
Self(0)
}
}
impl Merge for TestLogData {
fn merge(self, rhs: Self) -> Self {
Self(self.0 + rhs.0)
}
}
impl From<(SystemNodeId, Measurement)> for TestLogData {
fn from(value: (SystemNodeId, Measurement)) -> Self {
unreachable!()
}
}
impl Into<SystemState> for TestLogData {
fn into(self) -> SystemState {
unreachable!()
}
}
#[test]
pub fn new_log_is_empty() {
let log = Log::<TestLogData>::new();
assert!(log.is_empty())
}
#[test]
pub fn can_insert() {
let mut log = Log::<TestLogData>::new();
let data = some_entry();
log.push(data);
assert!(!log.is_empty())
}
#[test]
#[should_panic]
pub fn zero_is_not_valid_index_when_getting() {
let mut log = Log::<TestLogData>::new();
log.push(some_entry()).expect("Failed push");
let result = log.get(0);
}
#[test]
#[should_panic]
pub fn zero_is_not_valid_index_when_inserting() {
let mut log = Log::<TestLogData>::new();
log.insert(1, some_entry());
let result = log.get(0);
}
#[test]
pub fn one_is_the_first_index() {
let mut log = Log::<TestLogData>::new();
log.push(some_entry()).expect("Failed push");
let result = log.get(1).cloned();
assert_eq!(Some(some_entry()), result);
}
#[test]
pub fn when_inserting_in_position_greater_than_length_then_is_the_same_as_pushing() {
let mut log = Log::<TestLogData>::new();
log.insert(32, some_entry());
assert!(log.get(32).is_none());
assert!(log.get(1).is_some());
}
#[test]
pub fn when_inserting_in_existing_position_replaces_data() {
let mut log = Log::<TestLogData>::new();
log.push(entry(1));
log.push(entry(2));
log.push(entry(3));
log.insert(2, entry(42));
assert_eq!(&entry(3), log.get(3).unwrap());
assert_eq!(&entry(42), log.get(2).unwrap());
assert_eq!(&entry(1), log.get(1).unwrap());
}
#[test]
#[should_panic]
pub fn zero_is_not_valid_index_for_snapshot() {
let mut log = Log::<TestLogData>::new();
log.snapshot(0);
}
#[test]
pub fn can_calculate_log_capacity() {
let mut log = Log::<TestLogData>::new();
log.push(entry(1));
log.push(entry(2));
log.push(entry(3));
assert!((1.0 - (3.0 / LOG_LEN as f32) - log.capacity()).abs() < 0.0001);
}
#[test]
pub fn snapshot_empty_log_leaves_everything_untouched() {
let mut log = Log::<TestLogData>::new();
log.snapshot(1);
log.push(entry(1));
log.push(entry(2));
log.push(entry(3));
assert_eq!(&entry(3), log.get(3).unwrap());
assert_eq!(&entry(2), log.get(2).unwrap());
assert_eq!(&entry(1), log.get(1).unwrap());
}
#[test]
pub fn snapshot_log_with_one_element_leaves_everything_untouched() {
let mut log = Log::<TestLogData>::new();
log.push(entry(1));
log.snapshot(1);
log.push(entry(2));
log.push(entry(3));
assert_eq!(&entry(3), log.get(3).unwrap());
assert_eq!(&entry(2), log.get(2).unwrap());
assert_eq!(&entry(1), log.get(1).unwrap());
}
#[test]
pub fn can_get_value_not_snapshotted() {
let mut log = Log::<TestLogData>::new();
log.push(entry(1));
log.push(entry(2));
log.push(entry(3));
log.snapshot(2);
assert_eq!(&entry(3), log.get(3).unwrap());
}
#[test]
pub fn all_snapshotted_values_are_the_same() {
let mut log = Log::<TestLogData>::new();
log.push(entry(1));
log.push(entry(2));
log.push(entry(3));
log.snapshot(3);
assert_eq!(log.get(1).unwrap(), log.get(2).unwrap());
assert_eq!(log.get(2).unwrap(), log.get(3).unwrap());
}
#[test]
pub fn can_check_if_entry_was_snapshotted() {
let mut log = Log::<TestLogData>::new();
log.push(entry(1));
log.push(entry(2));
log.push(entry(3));
log.snapshot(2);
assert!(log.is_snapshot(2));
assert!(!log.is_snapshot(3));
}
#[test]
pub fn can_get_last_included_index() {
let mut log = Log::<TestLogData>::new();
log.push(entry(1));
log.push(entry(2));
log.push(entry(3));
log.snapshot(2);
assert_eq!(2, log.last_included_index());
}
#[test]
pub fn can_get_last_included_index_after_multiple_snapshots() {
let mut log = Log::<TestLogData>::new();
log.push(entry(1));
log.push(entry(2));
log.push(entry(3));
log.snapshot(3);
log.push(entry(4));
log.push(entry(5));
log.push(entry(6));
log.snapshot(6);
assert_eq!(6, log.last_included_index());
assert_eq!(&entry(21), log.get(6).unwrap());
}
#[test]
pub fn snapshot_is_the_merge_of_entries_up_to_idx() {
let mut log = Log::<TestLogData>::new();
log.push(entry(1));
log.push(entry(2));
log.push(entry(3));
log.snapshot(3);
assert_eq!(&entry(6), log.get(1).unwrap());
}
#[test]
pub fn different_indices_generate_different_snapshot_data() {
let mut log = Log::<TestLogData>::new();
log.push(entry(1));
log.push(entry(2));
log.push(entry(3));
let mut log2 = log.clone();
log.snapshot(3);
log2.snapshot(2);
assert_ne!(log.get(1), log2.get(1));
assert_eq!(&entry(6), log.get(1).unwrap());
assert_eq!(&entry(3), log2.get(1).unwrap());
}
#[test]
pub fn the_snapshot_term_is_the_greater_term_of_all_entries() {
let mut log = Log::<TestLogData>::new();
log.push(entry_with_term(0, 0));
log.push(entry_with_term(1, 0));
log.push(entry_with_term(2, 0));
log.snapshot(3);
assert_eq!(&entry_with_term(2, 0), log.get(1).unwrap());
}
#[test]
pub fn different_indices_generate_different_snapshot_term() {
let mut log = Log::<TestLogData>::new();
log.push(entry_with_term(0, 1));
log.push(entry_with_term(1, 2));
log.push(entry_with_term(2, 3));
let mut log2 = log.clone();
log.snapshot(3);
log2.snapshot(2);
assert_eq!(&entry_with_term(2, 6), log.get(1).unwrap());
assert_eq!(&entry_with_term(1, 3), log2.get(1).unwrap());
}
#[test]
pub fn the_snapshot_change_request_is_the_latest_change_req() {
let mut log = Log::<TestLogData>::new();
log.push(config_change(&[1]));
log.push(config_change(&[1, 2]));
log.push(config_change(&[1, 2, 3]));
log.snapshot(3);
assert_eq!(&config_change(&[1, 2, 3]), log.get(1).unwrap());
}
fn update_cluster_vector(slice: Vec<u32>) -> Option<UpdateClusterVec> {
let mut cluster = UpdateClusterVec::new();
for elem in slice {
cluster.push(SystemNodeId::from(elem));
}
Some(cluster)
}
#[test]
pub fn entries_with_data_config_change_and_rule() {
let mut log = Log::<TestLogData>::new();
log.push(LogEntry::new(0, Some(TestLogData::noop()), update_cluster_vector(vec!(1, 2)), Some(rule(1))));
log.push(LogEntry::new(1, Some(TestLogData(2)), None, None));
log.push(LogEntry::new(2, None, update_cluster_vector(vec!(1, 2, 3)), Some(rule(2))));
log.push(LogEntry::new(3, None, None, None));
log.snapshot(4);
let expected_snapshot = LogEntry::new(
3,
Some(TestLogData(2)),
update_cluster_vector(vec!(1, 2, 3)),
Some(rule(2)),
);
assert_eq!(&expected_snapshot, log.get(1).unwrap());
}
fn config_change(config: &[u32]) -> LogEntry<TestLogData> {
LogEntry::with_config(0, update_cluster_vector(config.to_vec()))
}
#[test]
pub fn snapshot_several_times() {
let mut log = Log::<TestLogData>::new();
log.push(LogEntry::new(1, Some(TestLogData::noop()), update_cluster_vector(vec!(1, 2)), None));
log.push(LogEntry::new(2, Some(TestLogData(2)), None, None));
log.push(LogEntry::new(3, None, update_cluster_vector(vec!(1, 2, 3)), None));
log.push(LogEntry::new(4, None, None, None));
log.snapshot(4);
log.push(LogEntry::new(5, Some(TestLogData(3)), None, None));
log.push(LogEntry::new(6, None, update_cluster_vector(vec!(1, 2, 3, 4)), None));
log.push(LogEntry::new(7, None, None, None));
log.push(LogEntry::new(8, Some(TestLogData(5)), None, None));
log.snapshot(7);
let expected_data = Some(TestLogData(5));
let expected_config = update_cluster_vector(vec!(1, 2, 3, 4));
let expected_snapshot = LogEntry::new(7, expected_data, expected_config, None);
assert_eq!(&expected_snapshot, log.get(1).unwrap());
assert_eq!(
&LogEntry::new(8, Some(TestLogData(5)), None, None),
log.get(8).unwrap()
);
}
#[test]
pub fn snapshot_log_increases_capacity() {
let mut log = Log::<TestLogData>::new();
log.push(entry(1));
log.push(entry(2));
log.push(entry(3));
let initial_capacity = log.capacity();
log.snapshot(2);
let final_capacity = log.capacity();
assert!(initial_capacity < final_capacity);
}
#[test]
pub fn more_elements_in_snapshot_means_more_capacity_gained() {
let mut log = Log::<TestLogData>::new();
log.push(entry(1));
log.push(entry(2));
log.push(entry(3));
let mut log2 = log.clone();
log.snapshot(2);
log2.snapshot(3);
assert!(log.capacity() < log2.capacity());
}
#[test]
pub fn if_no_snapshot_then_last_included_term_is_zero() {
let mut log = Log::<TestLogData>::new();
log.push(LogEntry::with_data(1, Some(TestLogData(1))));
log.push(LogEntry::with_data(1, Some(TestLogData(2))));
log.push(LogEntry::with_data(1, Some(TestLogData(3))));
assert_eq!(0, log.last_included_term());
}
#[test]
pub fn if_snapshot_then_last_included_term_is_given() {
let mut log = Log::<TestLogData>::new();
log.push(LogEntry::with_data(1, Some(TestLogData(1))));
log.push(LogEntry::with_data(2, Some(TestLogData(2))));
log.push(LogEntry::with_data(3, Some(TestLogData(3))));
log.snapshot(3);
assert_eq!(3, log.last_included_term());
}
#[test]
pub fn install_snapshot() {
let mut log = Log::<TestLogData>::new();
log.push(LogEntry::with_data(1, Some(TestLogData(1))));
log.push(LogEntry::with_data(2, Some(TestLogData(2))));
log.push(LogEntry::with_data(3, Some(TestLogData(3))));
log.install_snapshot(2, LogEntry::with_data(2, Some(TestLogData(3))));
assert_eq!(2, log.len());
assert!(log.is_snapshot(1));
assert!(log.is_snapshot(2));
assert!(!log.is_snapshot(3));
}
#[test]
pub fn append_after_install_snapshot() {
let mut log = Log::<TestLogData>::new();
log.push(LogEntry::with_data(1, Some(TestLogData(1))));
log.push(LogEntry::with_data(2, Some(TestLogData(2))));
log.push(LogEntry::with_data(3, Some(TestLogData(3))));
log.install_snapshot(2, LogEntry::with_data(2, Some(TestLogData(3))));
log.push(LogEntry::with_data(4, Some(TestLogData(4))));
assert_eq!(
&LogEntry::with_data(4, Some(TestLogData(4))),
log.get(4).unwrap()
)
}
#[test]
pub fn multiple_install_snapshot() {
let mut log = Log::<TestLogData>::new();
log.push(LogEntry::with_data(1, Some(TestLogData(1))));
log.push(LogEntry::with_data(2, Some(TestLogData(2))));
log.push(LogEntry::with_data(3, Some(TestLogData(3))));
log.install_snapshot(2, LogEntry::with_data(2, Some(TestLogData(3))));
log.push(LogEntry::with_data(4, Some(TestLogData(4))));
log.push(LogEntry::with_data(5, Some(TestLogData(5))));
log.push(LogEntry::with_data(6, Some(TestLogData(6))));
log.install_snapshot(5, LogEntry::with_data(5, Some(TestLogData(15))));
assert_eq!(
&LogEntry::with_data(5, Some(TestLogData(15))),
log.get(5).unwrap()
);
assert_eq!(
&LogEntry::with_data(6, Some(TestLogData(6))),
log.get(6).unwrap()
)
}
fn some_entry() -> LogEntry<TestLogData> {
entry(1)
}
fn entry(value: usize) -> LogEntry<TestLogData> {
LogEntry::with_data(0, Some(TestLogData(value)))
}
fn rule(threshold: i32) -> Rule {
Rule {
name: RuleType::MeanOver,
threshold,
}
}
fn entry_with_term(term: u16, value: usize) -> LogEntry<TestLogData> {
LogEntry::with_data(term as Term, Some(TestLogData(value)))
}
}