dcs2_raft/
messages.rs

1use std::cmp::{max, min};
2use std::fmt::{Debug, Write};
3use std::slice::Iter;
4use log::{debug, error};
5
6use serde::de::DeserializeOwned;
7
8use dcs::communication::messages::{UpdateClusterVec, COORDINATION_MESSAGE_OPCODE, Header, IdentificableMessage, Package, PackageBuilder};
9use dcs::heapless;
10use dcs::nodes::SystemNodeId;
11use dcs::properties::MEASUREMENTS_MAX_COUNT;
12use dcs::rules::measurements::{Measurement, SystemState};
13use dcs::rules::strategy::Rule;
14
15use crate::server::{LogData, Merge, NoOp};
16
17use super::*;
18
19pub type Term = u16;
20pub type LogIndex = u32;
21
22pub const LOG_LEN: usize = CLUSTER_NODE_COUNT * MEASUREMENTS_MAX_COUNT / 3;
23
24#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
25pub struct Log<T: LogData> {
26    vec: heapless::Vec<LogEntry<T>, LOG_LEN>,
27    index_shift: u32,
28}
29
30impl<T: LogData> Default for Log<T> {
31    fn default() -> Self {
32        Self {
33            vec: Default::default(),
34            index_shift: 1,
35        }
36    }
37}
38
39impl<T: LogData> Log<T> {
40    pub fn new() -> Log<T> {
41        Self {
42            vec: Default::default(),
43            index_shift: 1,
44        }
45    }
46    pub fn noop() -> Log<T> {
47        Self::new()
48    }
49
50    pub fn push(&mut self, data: LogEntry<T>) -> Result<(), LogEntry<T>> {
51        self.vec.push(data)
52    }
53
54    pub fn insert(&mut self, idx: LogIndex, data: LogEntry<T>) {
55        assert!(idx > 0);
56        let idx: usize = self.shift_idx(idx);
57        if idx < self.len() {
58            self.vec[idx] = data
59        } else {
60            if self.vec.push(data).is_err() {
61                error!("Couldn't append element to log: max capacity of {} reached.", self.vec.len())
62            };
63        }
64    }
65
66    pub fn is_snapshot(&self, idx: LogIndex) -> bool {
67        idx <= self.index_shift
68    }
69
70    pub fn last_included_index(&self) -> u32 {
71        self.index_shift
72    }
73
74    pub fn last_included_term(&self) -> Term {
75        if self.index_shift > 1 {
76            self.vec.first().unwrap().term
77        } else {
78            0
79        }
80    }
81
82    pub fn snapshot(&mut self, idx: LogIndex) {
83        assert!(idx > 0);
84        if idx < 2 {
85            return;
86        }
87        let snapshot_count = min(self.shift_idx(idx) as u32 + 1, self.len() as u32);
88        let mut data_accum = None::<T>;
89        let mut term_accum = Term::default();
90        let mut rule_accum = None;
91        let mut config_change_accum = None;
92        for _ in 0..snapshot_count {
93            let entry = self.vec.remove(0);
94            data_accum = match (data_accum, entry.data) {
95                (Some(acc), Some(d)) => Some(acc.merge(d)),
96                (acc, d) => acc.or(d),
97            };
98            term_accum = max(entry.term, term_accum);
99            config_change_accum = entry.config_change.or(config_change_accum);
100            rule_accum = entry.rule.or(rule_accum);
101        }
102        self.vec.insert(0, LogEntry::new(term_accum, data_accum, config_change_accum, rule_accum));
103        self.index_shift += snapshot_count.checked_sub(1).unwrap_or_default();
104    }
105
106    pub fn install_snapshot(&mut self, last_included_index: LogIndex, entry: LogEntry<T>) {
107        let snapshot_count = min(
108            self.shift_idx(last_included_index) as u32 + 1,
109            self.len() as u32,
110        );
111        for _ in 0..snapshot_count {
112            self.vec.remove(0);
113        }
114        self.index_shift = last_included_index;
115        self.vec.insert(0, entry);
116    }
117
118    pub fn capacity(&self) -> f32 {
119        1.0 - (self.len() as f32 / LOG_LEN as f32)
120    }
121
122    fn shift_idx(&self, idx: LogIndex) -> usize {
123        idx.checked_sub(self.index_shift).unwrap_or_default() as usize
124    }
125
126    pub fn pop(&mut self) -> Option<LogEntry<T>> {
127        self.vec.pop()
128    }
129
130    pub fn get(&self, idx: LogIndex) -> Option<&LogEntry<T>> {
131        assert!(idx > 0);
132        self.vec.get(self.shift_idx(idx))
133    }
134
135    pub fn len(&self) -> usize {
136        self.vec.len()
137    }
138
139    pub fn last_index(&self) -> usize {
140        (self.vec.len()  + self.index_shift as usize).checked_sub(1).unwrap_or_default()
141    }
142
143    pub fn is_empty(&self) -> bool {
144        self.vec.is_empty()
145    }
146
147    pub fn last(&self) -> Option<&LogEntry<T>> {
148        self.vec.last()
149    }
150
151    pub fn iter(&self) -> Iter<'_, LogEntry<T>> {
152        self.vec.iter()
153    }
154}
155
156impl<T: LogData, I: Iterator<Item = LogEntry<T>>> From<I> for Log<T> {
157    fn from(value: I) -> Self {
158        let mut log = Self::new();
159        for i in value {
160            log.push(i);
161        }
162        log
163    }
164}
165
166impl<T: LogData> IntoIterator for Log<T> {
167    type Item = LogEntry<T>;
168    type IntoIter = IntoIter<T>;
169
170    fn into_iter(self) -> Self::IntoIter {
171        IntoIter {
172            idx: 0,
173            vec: self.vec,
174        }
175    }
176}
177
178pub struct IntoIter<T: LogData> {
179    idx: usize,
180    vec: heapless::Vec<LogEntry<T>, LOG_LEN>,
181}
182
183impl<T: LogData> Iterator for IntoIter<T> {
184    type Item = LogEntry<T>;
185
186    fn next(&mut self) -> Option<Self::Item> {
187        let current = self.vec.get(self.idx);
188        self.idx += 1;
189        current.cloned()
190    }
191}
192
193#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
194pub struct LogEntry<T: LogData> {
195    pub term: Term,
196    pub data: Option<T>,
197    pub config_change: Option<UpdateClusterVec>,
198    pub rule: Option<Rule>,
199}
200
201impl<T: LogData> LogEntry<T> {
202    pub fn new(term: Term, data: Option<T>, config_change: Option<UpdateClusterVec>, rule: Option<Rule>, ) -> Self {
203        Self {
204            term,
205            data,
206            config_change,
207            rule,
208        }
209    }
210    pub fn with_data(term: Term, data: Option<T>) -> Self {
211        Self {
212            term,
213            data,
214            config_change: None,
215            rule: None,
216        }
217    }
218
219    pub fn with_rule(term: Term, rule: Rule) -> Self {
220        Self {
221            term,
222            data: None,
223            config_change: None,
224            rule: Some(rule),
225        }
226    }
227
228    pub fn with_config(term: Term, config_change: Option<UpdateClusterVec>) -> Self {
229        Self {
230            term,
231            data: None,
232            config_change,
233            rule: None,
234        }
235    }
236}
237
238#[derive(Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)]
239pub struct RequestVoteArgs {
240    pub term: Term,
241    pub prev_log_index: LogIndex,
242    pub prev_log_term: Term,
243}
244
245#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
246pub struct AppendLogArgs<T: LogData> {
247    pub term: Term,
248    pub prev_log_index: LogIndex,
249    pub prev_log_term: Term,
250    pub entries: Log<T>,
251    pub leader_commit: LogIndex,
252}
253
254#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
255pub enum RaftMessage<T>
256where
257    T: LogData,
258{
259    RequestVote(RequestVoteArgs),
260    RequestVoteResponse(RequestVoteResponseResult),
261    AppendLog(AppendLogArgs<T>),
262    AppendLogResponse(AppendLogResponseResult),
263    ReadRequest,
264    ReadRequestReply(ReadRequestReplyArgs<T>),
265    WriteRequest(WriteRequestArgs),
266    WriteRequestReply(bool, Option<SystemNodeId>),
267    ConfigChange(UpdateClusterVec),
268    ConfigChangeACK(bool),
269    InstallSnapshot(InstallSnapshotArgs<T>),
270    InstallSnapshotResponse(Term),
271}
272
273impl<T: LogData> Display for RaftMessage<T> {
274    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
275        match self {
276            RaftMessage::RequestVote(args) => {
277                f.write_fmt(format_args!("RequestVote term: {}", args.term))
278            }
279            RaftMessage::RequestVoteResponse(args) => {
280                let granted = if args.granted {"granted"} else {"rejected"};
281                f.write_fmt(format_args!("RequestVoteResponse {}", granted))
282            }
283            RaftMessage::AppendLog(args) => {
284                f.write_fmt(format_args!("AppendLog term: {} index: {}", args.term, args.prev_log_index + 1))
285            }
286            RaftMessage::AppendLogResponse(args) => {
287                let granted = if args.success {"granted"} else {"rejected"};
288                f.write_fmt(format_args!("AppendLogResponse {}", granted))
289            }
290            RaftMessage::ReadRequest => {
291                f.write_fmt(format_args!("ReadRequest"))
292            }
293            RaftMessage::ReadRequestReply(args) => {
294                let granted = if args.success {"granted"} else {"rejected"};
295                f.write_fmt(format_args!("ReadRequestReply {}", granted))
296            }
297            RaftMessage::WriteRequest(args) => {
298                let value = args.measurement.map(|m| m.value.to_string()).unwrap_or_default();
299                let measurement_type = args.measurement.map(|m| m.typed.to_string()).unwrap_or_default();
300                f.write_fmt(format_args!("WriteRequest value: {} type: {:?}", value, measurement_type))
301            }
302            RaftMessage::WriteRequestReply(success, _) => {
303                let granted = if *success {"granted"} else {"rejected"};
304                f.write_fmt(format_args!("WriteRequestReply {}", granted))
305            }
306            RaftMessage::ConfigChange(args) => {
307                let prefix = f.write_str("ConfigChange new cluster: ");
308                let cluster_ids = args.iter().map(|id| f.write_fmt(format_args!("{id} "))).collect();
309                prefix.and(cluster_ids)
310            }
311            RaftMessage::ConfigChangeACK(success) => {
312                let granted = if *success {"granted"} else {"rejected"};
313                f.write_fmt(format_args!("ConfigChangeACK {}", granted))
314            }
315            RaftMessage::InstallSnapshot(args) => {
316                f.write_fmt(format_args!("InstallSnapshot"))
317            }
318            RaftMessage::InstallSnapshotResponse(args) => {
319                f.write_fmt(format_args!("ConfigChangeACK"))
320            }
321        }
322    }
323}
324
325impl<T: Clone + Debug + Default + Serialize + DeserializeOwned + LogData> Default
326    for RaftMessage<T>
327{
328    fn default() -> Self {
329        RaftMessage::ReadRequest
330    }
331}
332
333impl<T: LogData + Clone + Debug + Default + Serialize + DeserializeOwned> IdentificableMessage
334    for RaftMessage<T>
335{
336    fn id() -> u8 {
337        COORDINATION_MESSAGE_OPCODE
338    }
339}
340
341pub struct RaftPackageBuilder<T: LogData> {
342    to: Option<SystemNodeId>,
343    from: Option<SystemNodeId>,
344    body: Option<RaftMessage<T>>,
345}
346
347impl<T: LogData> Default for RaftPackageBuilder<T> {
348    fn default() -> Self {
349        Self {
350            to: None,
351            from: None,
352            body: None,
353        }
354    }
355}
356
357impl<T: LogData> PackageBuilder for RaftPackageBuilder<T> {
358    type NodeId = SystemNodeId;
359    type Message = RaftMessage<T>;
360
361    fn clean_copy(&self) -> Self {
362        Self::default()
363    }
364
365    fn to(mut self, id: SystemNodeId) -> Self {
366        self.to = Some(id);
367        self
368    }
369
370    fn from(mut self, id: SystemNodeId) -> Self {
371        self.from = Some(id);
372        self
373    }
374    fn with_message(mut self, msg: RaftMessage<T>) -> Self {
375        self.body = Some(msg);
376        self
377    }
378
379    fn respond_to(self, package: Package<Self::NodeId, Self::Message>) -> Self {
380        self.to(package.header.from)
381    }
382
383    fn build(self) -> Result<Package<Self::NodeId, Self::Message>, ()> {
384        Ok(Package {
385            header: Header {
386                from: self.from.ok_or(())?,
387                to: self.to.ok_or(())?,
388            },
389            body: self.body.ok_or(())?,
390        })
391    }
392}
393
394#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
395pub struct InstallSnapshotArgs<T: LogData> {
396    pub term: Term,
397    pub leader_id: SystemNodeId,
398    pub last_included_index: LogIndex,
399    pub last_included_term: Term,
400    pub data: LogEntry<T>,
401}
402
403#[derive(Eq, PartialEq, Debug, Copy, Clone, Serialize, Deserialize)]
404pub struct ReadRequestReplyArgs<T> {
405    pub success: bool,
406    pub data: Option<T>,
407    pub redirect: Option<SystemNodeId>,
408}
409
410impl<T> ReadRequestReplyArgs<T> {
411    pub fn fail(leader_id: Option<SystemNodeId>) -> Self {
412        Self {
413            success: false,
414            data: None,
415            redirect: leader_id,
416        }
417    }
418}
419
420#[derive(Eq, PartialEq, Debug, Copy, Clone, Serialize, Deserialize)]
421pub struct RequestVoteResponseResult {
422    pub term: Term,
423    pub granted: bool,
424}
425
426#[derive(Eq, PartialEq, Debug, Copy, Clone, Serialize, Deserialize)]
427pub struct AppendLogResponseResult {
428    pub term: Term,
429    pub success: bool,
430}
431
432#[derive(Eq, PartialEq, Debug, Copy, Clone, Serialize, Deserialize)]
433pub struct WriteRequestArgs {
434    pub measurement: Option<Measurement>,
435    pub rule: Option<Rule>,
436    pub id: SystemNodeId,
437}
438
439impl WriteRequestArgs {
440    pub fn with_rule(id: SystemNodeId, rule: Rule) -> Self {
441        Self {
442            measurement: None,
443            rule: Some(rule),
444            id
445        }
446    }
447
448    pub fn with_measurement(id: SystemNodeId, measurement: Measurement) -> Self {
449        Self {
450            measurement: Some(measurement),
451            rule: None,
452            id
453        }
454    }
455}
456
457#[cfg(test)]
458mod log_tests {
459    use crate::messages::{UpdateClusterVec, Log, LogEntry, Term, LOG_LEN};
460    use crate::server::{Merge, NoOp};
461    use dcs::nodes::SystemNodeId;
462    use dcs::rules::measurements::{Measurement, SystemState};
463    use serde::{Serialize, Serializer};
464    use std::fmt::Debug;
465    use dcs::rules::strategy::{Rule, RuleType};
466
467    #[derive(Eq, PartialEq, Copy, Clone, Debug, Serialize)]
468    struct TestLogData(usize);
469
470    impl NoOp for TestLogData {
471        fn noop() -> Self {
472            Self(0)
473        }
474    }
475
476    impl Merge for TestLogData {
477        fn merge(self, rhs: Self) -> Self {
478            Self(self.0 + rhs.0)
479        }
480    }
481
482    impl From<(SystemNodeId, Measurement)> for TestLogData {
483        fn from(value: (SystemNodeId, Measurement)) -> Self {
484            unreachable!()
485        }
486    }
487
488    impl Into<SystemState> for TestLogData {
489        fn into(self) -> SystemState {
490            unreachable!()
491        }
492    }
493
494    #[test]
495    pub fn new_log_is_empty() {
496        let log = Log::<TestLogData>::new();
497        assert!(log.is_empty())
498    }
499
500    #[test]
501    pub fn can_insert() {
502        let mut log = Log::<TestLogData>::new();
503        let data = some_entry();
504        log.push(data);
505
506        assert!(!log.is_empty())
507    }
508
509    #[test]
510    #[should_panic]
511    pub fn zero_is_not_valid_index_when_getting() {
512        let mut log = Log::<TestLogData>::new();
513        log.push(some_entry()).expect("Failed push");
514
515        let result = log.get(0);
516    }
517
518    #[test]
519    #[should_panic]
520    pub fn zero_is_not_valid_index_when_inserting() {
521        let mut log = Log::<TestLogData>::new();
522        log.insert(1, some_entry());
523
524        let result = log.get(0);
525    }
526
527    #[test]
528    pub fn one_is_the_first_index() {
529        let mut log = Log::<TestLogData>::new();
530        log.push(some_entry()).expect("Failed push");
531
532        let result = log.get(1).cloned();
533        assert_eq!(Some(some_entry()), result);
534    }
535
536    #[test]
537    pub fn when_inserting_in_position_greater_than_length_then_is_the_same_as_pushing() {
538        let mut log = Log::<TestLogData>::new();
539        log.insert(32, some_entry());
540
541        assert!(log.get(32).is_none());
542        assert!(log.get(1).is_some());
543    }
544
545    #[test]
546    pub fn when_inserting_in_existing_position_replaces_data() {
547        let mut log = Log::<TestLogData>::new();
548        log.push(entry(1));
549        log.push(entry(2));
550        log.push(entry(3));
551
552        log.insert(2, entry(42));
553
554        assert_eq!(&entry(3), log.get(3).unwrap());
555        assert_eq!(&entry(42), log.get(2).unwrap());
556        assert_eq!(&entry(1), log.get(1).unwrap());
557    }
558
559    #[test]
560    #[should_panic]
561    pub fn zero_is_not_valid_index_for_snapshot() {
562        let mut log = Log::<TestLogData>::new();
563        log.snapshot(0);
564    }
565
566    #[test]
567    pub fn can_calculate_log_capacity() {
568        let mut log = Log::<TestLogData>::new();
569        log.push(entry(1));
570        log.push(entry(2));
571        log.push(entry(3));
572
573        assert!((1.0 - (3.0 / LOG_LEN as f32) - log.capacity()).abs() < 0.0001);
574    }
575
576    #[test]
577    pub fn snapshot_empty_log_leaves_everything_untouched() {
578        let mut log = Log::<TestLogData>::new();
579        log.snapshot(1);
580
581        log.push(entry(1));
582        log.push(entry(2));
583        log.push(entry(3));
584
585        assert_eq!(&entry(3), log.get(3).unwrap());
586        assert_eq!(&entry(2), log.get(2).unwrap());
587        assert_eq!(&entry(1), log.get(1).unwrap());
588    }
589
590    #[test]
591    pub fn snapshot_log_with_one_element_leaves_everything_untouched() {
592        let mut log = Log::<TestLogData>::new();
593        log.push(entry(1));
594
595        log.snapshot(1);
596
597        log.push(entry(2));
598        log.push(entry(3));
599        assert_eq!(&entry(3), log.get(3).unwrap());
600        assert_eq!(&entry(2), log.get(2).unwrap());
601        assert_eq!(&entry(1), log.get(1).unwrap());
602    }
603
604    #[test]
605    pub fn can_get_value_not_snapshotted() {
606        let mut log = Log::<TestLogData>::new();
607        log.push(entry(1));
608        log.push(entry(2));
609        log.push(entry(3));
610
611        log.snapshot(2);
612
613        assert_eq!(&entry(3), log.get(3).unwrap());
614    }
615
616    #[test]
617    pub fn all_snapshotted_values_are_the_same() {
618        let mut log = Log::<TestLogData>::new();
619        log.push(entry(1));
620        log.push(entry(2));
621        log.push(entry(3));
622
623        log.snapshot(3);
624
625        assert_eq!(log.get(1).unwrap(), log.get(2).unwrap());
626        assert_eq!(log.get(2).unwrap(), log.get(3).unwrap());
627    }
628
629    #[test]
630    pub fn can_check_if_entry_was_snapshotted() {
631        let mut log = Log::<TestLogData>::new();
632        log.push(entry(1));
633        log.push(entry(2));
634        log.push(entry(3));
635
636        log.snapshot(2);
637
638        assert!(log.is_snapshot(2));
639        assert!(!log.is_snapshot(3));
640    }
641
642    #[test]
643    pub fn can_get_last_included_index() {
644        let mut log = Log::<TestLogData>::new();
645        log.push(entry(1));
646        log.push(entry(2));
647        log.push(entry(3));
648
649        log.snapshot(2);
650
651        assert_eq!(2, log.last_included_index());
652    }
653
654    #[test]
655    pub fn can_get_last_included_index_after_multiple_snapshots() {
656        let mut log = Log::<TestLogData>::new();
657        log.push(entry(1));
658        log.push(entry(2));
659        log.push(entry(3));
660
661        log.snapshot(3);
662
663        log.push(entry(4));
664        log.push(entry(5));
665        log.push(entry(6));
666
667        log.snapshot(6);
668
669        assert_eq!(6, log.last_included_index());
670        assert_eq!(&entry(21), log.get(6).unwrap());
671    }
672
673    #[test]
674    pub fn snapshot_is_the_merge_of_entries_up_to_idx() {
675        let mut log = Log::<TestLogData>::new();
676        log.push(entry(1));
677        log.push(entry(2));
678        log.push(entry(3));
679
680        log.snapshot(3);
681
682        assert_eq!(&entry(6), log.get(1).unwrap());
683    }
684
685    #[test]
686    pub fn different_indices_generate_different_snapshot_data() {
687        let mut log = Log::<TestLogData>::new();
688        log.push(entry(1));
689        log.push(entry(2));
690        log.push(entry(3));
691
692        let mut log2 = log.clone();
693        log.snapshot(3);
694        log2.snapshot(2);
695
696        assert_ne!(log.get(1), log2.get(1));
697        assert_eq!(&entry(6), log.get(1).unwrap());
698        assert_eq!(&entry(3), log2.get(1).unwrap());
699    }
700
701    #[test]
702    pub fn the_snapshot_term_is_the_greater_term_of_all_entries() {
703        let mut log = Log::<TestLogData>::new();
704        log.push(entry_with_term(0, 0));
705        log.push(entry_with_term(1, 0));
706        log.push(entry_with_term(2, 0));
707
708        log.snapshot(3);
709
710        assert_eq!(&entry_with_term(2, 0), log.get(1).unwrap());
711    }
712
713    #[test]
714    pub fn different_indices_generate_different_snapshot_term() {
715        let mut log = Log::<TestLogData>::new();
716        log.push(entry_with_term(0, 1));
717        log.push(entry_with_term(1, 2));
718        log.push(entry_with_term(2, 3));
719
720        let mut log2 = log.clone();
721        log.snapshot(3);
722        log2.snapshot(2);
723
724        assert_eq!(&entry_with_term(2, 6), log.get(1).unwrap());
725        assert_eq!(&entry_with_term(1, 3), log2.get(1).unwrap());
726    }
727
728    #[test]
729    pub fn the_snapshot_change_request_is_the_latest_change_req() {
730        let mut log = Log::<TestLogData>::new();
731        log.push(config_change(&[1]));
732        log.push(config_change(&[1, 2]));
733        log.push(config_change(&[1, 2, 3]));
734
735        log.snapshot(3);
736
737        assert_eq!(&config_change(&[1, 2, 3]), log.get(1).unwrap());
738    }
739
740    fn update_cluster_vector(slice: Vec<u32>) -> Option<UpdateClusterVec> {
741        let mut cluster = UpdateClusterVec::new();
742        for elem in slice {
743            cluster.push(SystemNodeId::from(elem));
744        }
745        Some(cluster)
746    }
747
748    #[test]
749    pub fn entries_with_data_config_change_and_rule() {
750        let mut log = Log::<TestLogData>::new();
751        log.push(LogEntry::new(0, Some(TestLogData::noop()), update_cluster_vector(vec!(1, 2)), Some(rule(1))));
752        log.push(LogEntry::new(1, Some(TestLogData(2)), None, None));
753        log.push(LogEntry::new(2, None, update_cluster_vector(vec!(1, 2, 3)), Some(rule(2))));
754        log.push(LogEntry::new(3, None, None, None));
755
756        log.snapshot(4);
757
758        let expected_snapshot = LogEntry::new(
759            3,
760            Some(TestLogData(2)),
761            update_cluster_vector(vec!(1, 2, 3)),
762            Some(rule(2)),
763        );
764        assert_eq!(&expected_snapshot, log.get(1).unwrap());
765    }
766
767    fn config_change(config: &[u32]) -> LogEntry<TestLogData> {
768        LogEntry::with_config(0, update_cluster_vector(config.to_vec()))
769    }
770
771    #[test]
772    pub fn snapshot_several_times() {
773        let mut log = Log::<TestLogData>::new();
774        log.push(LogEntry::new(1, Some(TestLogData::noop()), update_cluster_vector(vec!(1, 2)), None));
775        log.push(LogEntry::new(2, Some(TestLogData(2)), None, None));
776        log.push(LogEntry::new(3, None, update_cluster_vector(vec!(1, 2, 3)), None));
777        log.push(LogEntry::new(4, None, None, None));
778
779        log.snapshot(4);
780
781        log.push(LogEntry::new(5, Some(TestLogData(3)), None, None));
782        log.push(LogEntry::new(6, None, update_cluster_vector(vec!(1, 2, 3, 4)), None));
783        log.push(LogEntry::new(7, None, None, None));
784        log.push(LogEntry::new(8, Some(TestLogData(5)), None, None));
785
786        log.snapshot(7);
787
788        let expected_data = Some(TestLogData(5));
789        let expected_config = update_cluster_vector(vec!(1, 2, 3, 4));
790        let expected_snapshot = LogEntry::new(7, expected_data, expected_config, None);
791        assert_eq!(&expected_snapshot, log.get(1).unwrap());
792        assert_eq!(
793            &LogEntry::new(8, Some(TestLogData(5)), None, None),
794            log.get(8).unwrap()
795        );
796    }
797
798    #[test]
799    pub fn snapshot_log_increases_capacity() {
800        let mut log = Log::<TestLogData>::new();
801        log.push(entry(1));
802        log.push(entry(2));
803        log.push(entry(3));
804
805        let initial_capacity = log.capacity();
806
807        log.snapshot(2);
808
809        let final_capacity = log.capacity();
810        assert!(initial_capacity < final_capacity);
811    }
812
813    #[test]
814    pub fn more_elements_in_snapshot_means_more_capacity_gained() {
815        let mut log = Log::<TestLogData>::new();
816        log.push(entry(1));
817        log.push(entry(2));
818        log.push(entry(3));
819
820        let mut log2 = log.clone();
821
822        log.snapshot(2);
823        log2.snapshot(3);
824
825        assert!(log.capacity() < log2.capacity());
826    }
827
828    #[test]
829    pub fn if_no_snapshot_then_last_included_term_is_zero() {
830        let mut log = Log::<TestLogData>::new();
831        log.push(LogEntry::with_data(1, Some(TestLogData(1))));
832        log.push(LogEntry::with_data(1, Some(TestLogData(2))));
833        log.push(LogEntry::with_data(1, Some(TestLogData(3))));
834
835        assert_eq!(0, log.last_included_term());
836    }
837
838    #[test]
839    pub fn if_snapshot_then_last_included_term_is_given() {
840        let mut log = Log::<TestLogData>::new();
841        log.push(LogEntry::with_data(1, Some(TestLogData(1))));
842        log.push(LogEntry::with_data(2, Some(TestLogData(2))));
843        log.push(LogEntry::with_data(3, Some(TestLogData(3))));
844
845        log.snapshot(3);
846
847        assert_eq!(3, log.last_included_term());
848    }
849
850    #[test]
851    pub fn install_snapshot() {
852        let mut log = Log::<TestLogData>::new();
853        log.push(LogEntry::with_data(1, Some(TestLogData(1))));
854        log.push(LogEntry::with_data(2, Some(TestLogData(2))));
855        log.push(LogEntry::with_data(3, Some(TestLogData(3))));
856
857        log.install_snapshot(2, LogEntry::with_data(2, Some(TestLogData(3))));
858
859        assert_eq!(2, log.len());
860        assert!(log.is_snapshot(1));
861        assert!(log.is_snapshot(2));
862        assert!(!log.is_snapshot(3));
863    }
864
865    #[test]
866    pub fn append_after_install_snapshot() {
867        let mut log = Log::<TestLogData>::new();
868        log.push(LogEntry::with_data(1, Some(TestLogData(1))));
869        log.push(LogEntry::with_data(2, Some(TestLogData(2))));
870        log.push(LogEntry::with_data(3, Some(TestLogData(3))));
871        log.install_snapshot(2, LogEntry::with_data(2, Some(TestLogData(3))));
872
873        log.push(LogEntry::with_data(4, Some(TestLogData(4))));
874
875        assert_eq!(
876            &LogEntry::with_data(4, Some(TestLogData(4))),
877            log.get(4).unwrap()
878        )
879    }
880
881    #[test]
882    pub fn multiple_install_snapshot() {
883        let mut log = Log::<TestLogData>::new();
884        log.push(LogEntry::with_data(1, Some(TestLogData(1))));
885        log.push(LogEntry::with_data(2, Some(TestLogData(2))));
886        log.push(LogEntry::with_data(3, Some(TestLogData(3))));
887
888        log.install_snapshot(2, LogEntry::with_data(2, Some(TestLogData(3))));
889
890        log.push(LogEntry::with_data(4, Some(TestLogData(4))));
891        log.push(LogEntry::with_data(5, Some(TestLogData(5))));
892        log.push(LogEntry::with_data(6, Some(TestLogData(6))));
893
894        log.install_snapshot(5, LogEntry::with_data(5, Some(TestLogData(15))));
895
896        assert_eq!(
897            &LogEntry::with_data(5, Some(TestLogData(15))),
898            log.get(5).unwrap()
899        );
900        assert_eq!(
901            &LogEntry::with_data(6, Some(TestLogData(6))),
902            log.get(6).unwrap()
903        )
904    }
905
906    fn some_entry() -> LogEntry<TestLogData> {
907        entry(1)
908    }
909
910    fn entry(value: usize) -> LogEntry<TestLogData> {
911        LogEntry::with_data(0, Some(TestLogData(value)))
912    }
913
914    fn rule(threshold: i32) -> Rule {
915        Rule {
916            name: RuleType::MeanOver,
917            threshold,
918        }
919    }
920
921    fn entry_with_term(term: u16, value: usize) -> LogEntry<TestLogData> {
922        LogEntry::with_data(term as Term, Some(TestLogData(value)))
923    }
924}