1use std::cmp::{max, min};
2use std::fmt::{Debug, Write};
3use std::slice::Iter;
4use log::{debug, error};
5
6use serde::de::DeserializeOwned;
7
8use dcs::communication::messages::{UpdateClusterVec, COORDINATION_MESSAGE_OPCODE, Header, IdentificableMessage, Package, PackageBuilder};
9use dcs::heapless;
10use dcs::nodes::SystemNodeId;
11use dcs::properties::MEASUREMENTS_MAX_COUNT;
12use dcs::rules::measurements::{Measurement, SystemState};
13use dcs::rules::strategy::Rule;
14
15use crate::server::{LogData, Merge, NoOp};
16
17use super::*;
18
19pub type Term = u16;
20pub type LogIndex = u32;
21
22pub const LOG_LEN: usize = CLUSTER_NODE_COUNT * MEASUREMENTS_MAX_COUNT / 3;
23
24#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
25pub struct Log<T: LogData> {
26 vec: heapless::Vec<LogEntry<T>, LOG_LEN>,
27 index_shift: u32,
28}
29
30impl<T: LogData> Default for Log<T> {
31 fn default() -> Self {
32 Self {
33 vec: Default::default(),
34 index_shift: 1,
35 }
36 }
37}
38
39impl<T: LogData> Log<T> {
40 pub fn new() -> Log<T> {
41 Self {
42 vec: Default::default(),
43 index_shift: 1,
44 }
45 }
46 pub fn noop() -> Log<T> {
47 Self::new()
48 }
49
50 pub fn push(&mut self, data: LogEntry<T>) -> Result<(), LogEntry<T>> {
51 self.vec.push(data)
52 }
53
54 pub fn insert(&mut self, idx: LogIndex, data: LogEntry<T>) {
55 assert!(idx > 0);
56 let idx: usize = self.shift_idx(idx);
57 if idx < self.len() {
58 self.vec[idx] = data
59 } else {
60 if self.vec.push(data).is_err() {
61 error!("Couldn't append element to log: max capacity of {} reached.", self.vec.len())
62 };
63 }
64 }
65
66 pub fn is_snapshot(&self, idx: LogIndex) -> bool {
67 idx <= self.index_shift
68 }
69
70 pub fn last_included_index(&self) -> u32 {
71 self.index_shift
72 }
73
74 pub fn last_included_term(&self) -> Term {
75 if self.index_shift > 1 {
76 self.vec.first().unwrap().term
77 } else {
78 0
79 }
80 }
81
82 pub fn snapshot(&mut self, idx: LogIndex) {
83 assert!(idx > 0);
84 if idx < 2 {
85 return;
86 }
87 let snapshot_count = min(self.shift_idx(idx) as u32 + 1, self.len() as u32);
88 let mut data_accum = None::<T>;
89 let mut term_accum = Term::default();
90 let mut rule_accum = None;
91 let mut config_change_accum = None;
92 for _ in 0..snapshot_count {
93 let entry = self.vec.remove(0);
94 data_accum = match (data_accum, entry.data) {
95 (Some(acc), Some(d)) => Some(acc.merge(d)),
96 (acc, d) => acc.or(d),
97 };
98 term_accum = max(entry.term, term_accum);
99 config_change_accum = entry.config_change.or(config_change_accum);
100 rule_accum = entry.rule.or(rule_accum);
101 }
102 self.vec.insert(0, LogEntry::new(term_accum, data_accum, config_change_accum, rule_accum));
103 self.index_shift += snapshot_count.checked_sub(1).unwrap_or_default();
104 }
105
106 pub fn install_snapshot(&mut self, last_included_index: LogIndex, entry: LogEntry<T>) {
107 let snapshot_count = min(
108 self.shift_idx(last_included_index) as u32 + 1,
109 self.len() as u32,
110 );
111 for _ in 0..snapshot_count {
112 self.vec.remove(0);
113 }
114 self.index_shift = last_included_index;
115 self.vec.insert(0, entry);
116 }
117
118 pub fn capacity(&self) -> f32 {
119 1.0 - (self.len() as f32 / LOG_LEN as f32)
120 }
121
122 fn shift_idx(&self, idx: LogIndex) -> usize {
123 idx.checked_sub(self.index_shift).unwrap_or_default() as usize
124 }
125
126 pub fn pop(&mut self) -> Option<LogEntry<T>> {
127 self.vec.pop()
128 }
129
130 pub fn get(&self, idx: LogIndex) -> Option<&LogEntry<T>> {
131 assert!(idx > 0);
132 self.vec.get(self.shift_idx(idx))
133 }
134
135 pub fn len(&self) -> usize {
136 self.vec.len()
137 }
138
139 pub fn last_index(&self) -> usize {
140 (self.vec.len() + self.index_shift as usize).checked_sub(1).unwrap_or_default()
141 }
142
143 pub fn is_empty(&self) -> bool {
144 self.vec.is_empty()
145 }
146
147 pub fn last(&self) -> Option<&LogEntry<T>> {
148 self.vec.last()
149 }
150
151 pub fn iter(&self) -> Iter<'_, LogEntry<T>> {
152 self.vec.iter()
153 }
154}
155
156impl<T: LogData, I: Iterator<Item = LogEntry<T>>> From<I> for Log<T> {
157 fn from(value: I) -> Self {
158 let mut log = Self::new();
159 for i in value {
160 log.push(i);
161 }
162 log
163 }
164}
165
166impl<T: LogData> IntoIterator for Log<T> {
167 type Item = LogEntry<T>;
168 type IntoIter = IntoIter<T>;
169
170 fn into_iter(self) -> Self::IntoIter {
171 IntoIter {
172 idx: 0,
173 vec: self.vec,
174 }
175 }
176}
177
178pub struct IntoIter<T: LogData> {
179 idx: usize,
180 vec: heapless::Vec<LogEntry<T>, LOG_LEN>,
181}
182
183impl<T: LogData> Iterator for IntoIter<T> {
184 type Item = LogEntry<T>;
185
186 fn next(&mut self) -> Option<Self::Item> {
187 let current = self.vec.get(self.idx);
188 self.idx += 1;
189 current.cloned()
190 }
191}
192
193#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
194pub struct LogEntry<T: LogData> {
195 pub term: Term,
196 pub data: Option<T>,
197 pub config_change: Option<UpdateClusterVec>,
198 pub rule: Option<Rule>,
199}
200
201impl<T: LogData> LogEntry<T> {
202 pub fn new(term: Term, data: Option<T>, config_change: Option<UpdateClusterVec>, rule: Option<Rule>, ) -> Self {
203 Self {
204 term,
205 data,
206 config_change,
207 rule,
208 }
209 }
210 pub fn with_data(term: Term, data: Option<T>) -> Self {
211 Self {
212 term,
213 data,
214 config_change: None,
215 rule: None,
216 }
217 }
218
219 pub fn with_rule(term: Term, rule: Rule) -> Self {
220 Self {
221 term,
222 data: None,
223 config_change: None,
224 rule: Some(rule),
225 }
226 }
227
228 pub fn with_config(term: Term, config_change: Option<UpdateClusterVec>) -> Self {
229 Self {
230 term,
231 data: None,
232 config_change,
233 rule: None,
234 }
235 }
236}
237
238#[derive(Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)]
239pub struct RequestVoteArgs {
240 pub term: Term,
241 pub prev_log_index: LogIndex,
242 pub prev_log_term: Term,
243}
244
245#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
246pub struct AppendLogArgs<T: LogData> {
247 pub term: Term,
248 pub prev_log_index: LogIndex,
249 pub prev_log_term: Term,
250 pub entries: Log<T>,
251 pub leader_commit: LogIndex,
252}
253
254#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
255pub enum RaftMessage<T>
256where
257 T: LogData,
258{
259 RequestVote(RequestVoteArgs),
260 RequestVoteResponse(RequestVoteResponseResult),
261 AppendLog(AppendLogArgs<T>),
262 AppendLogResponse(AppendLogResponseResult),
263 ReadRequest,
264 ReadRequestReply(ReadRequestReplyArgs<T>),
265 WriteRequest(WriteRequestArgs),
266 WriteRequestReply(bool, Option<SystemNodeId>),
267 ConfigChange(UpdateClusterVec),
268 ConfigChangeACK(bool),
269 InstallSnapshot(InstallSnapshotArgs<T>),
270 InstallSnapshotResponse(Term),
271}
272
273impl<T: LogData> Display for RaftMessage<T> {
274 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
275 match self {
276 RaftMessage::RequestVote(args) => {
277 f.write_fmt(format_args!("RequestVote term: {}", args.term))
278 }
279 RaftMessage::RequestVoteResponse(args) => {
280 let granted = if args.granted {"granted"} else {"rejected"};
281 f.write_fmt(format_args!("RequestVoteResponse {}", granted))
282 }
283 RaftMessage::AppendLog(args) => {
284 f.write_fmt(format_args!("AppendLog term: {} index: {}", args.term, args.prev_log_index + 1))
285 }
286 RaftMessage::AppendLogResponse(args) => {
287 let granted = if args.success {"granted"} else {"rejected"};
288 f.write_fmt(format_args!("AppendLogResponse {}", granted))
289 }
290 RaftMessage::ReadRequest => {
291 f.write_fmt(format_args!("ReadRequest"))
292 }
293 RaftMessage::ReadRequestReply(args) => {
294 let granted = if args.success {"granted"} else {"rejected"};
295 f.write_fmt(format_args!("ReadRequestReply {}", granted))
296 }
297 RaftMessage::WriteRequest(args) => {
298 let value = args.measurement.map(|m| m.value.to_string()).unwrap_or_default();
299 let measurement_type = args.measurement.map(|m| m.typed.to_string()).unwrap_or_default();
300 f.write_fmt(format_args!("WriteRequest value: {} type: {:?}", value, measurement_type))
301 }
302 RaftMessage::WriteRequestReply(success, _) => {
303 let granted = if *success {"granted"} else {"rejected"};
304 f.write_fmt(format_args!("WriteRequestReply {}", granted))
305 }
306 RaftMessage::ConfigChange(args) => {
307 let prefix = f.write_str("ConfigChange new cluster: ");
308 let cluster_ids = args.iter().map(|id| f.write_fmt(format_args!("{id} "))).collect();
309 prefix.and(cluster_ids)
310 }
311 RaftMessage::ConfigChangeACK(success) => {
312 let granted = if *success {"granted"} else {"rejected"};
313 f.write_fmt(format_args!("ConfigChangeACK {}", granted))
314 }
315 RaftMessage::InstallSnapshot(args) => {
316 f.write_fmt(format_args!("InstallSnapshot"))
317 }
318 RaftMessage::InstallSnapshotResponse(args) => {
319 f.write_fmt(format_args!("ConfigChangeACK"))
320 }
321 }
322 }
323}
324
325impl<T: Clone + Debug + Default + Serialize + DeserializeOwned + LogData> Default
326 for RaftMessage<T>
327{
328 fn default() -> Self {
329 RaftMessage::ReadRequest
330 }
331}
332
333impl<T: LogData + Clone + Debug + Default + Serialize + DeserializeOwned> IdentificableMessage
334 for RaftMessage<T>
335{
336 fn id() -> u8 {
337 COORDINATION_MESSAGE_OPCODE
338 }
339}
340
341pub struct RaftPackageBuilder<T: LogData> {
342 to: Option<SystemNodeId>,
343 from: Option<SystemNodeId>,
344 body: Option<RaftMessage<T>>,
345}
346
347impl<T: LogData> Default for RaftPackageBuilder<T> {
348 fn default() -> Self {
349 Self {
350 to: None,
351 from: None,
352 body: None,
353 }
354 }
355}
356
357impl<T: LogData> PackageBuilder for RaftPackageBuilder<T> {
358 type NodeId = SystemNodeId;
359 type Message = RaftMessage<T>;
360
361 fn clean_copy(&self) -> Self {
362 Self::default()
363 }
364
365 fn to(mut self, id: SystemNodeId) -> Self {
366 self.to = Some(id);
367 self
368 }
369
370 fn from(mut self, id: SystemNodeId) -> Self {
371 self.from = Some(id);
372 self
373 }
374 fn with_message(mut self, msg: RaftMessage<T>) -> Self {
375 self.body = Some(msg);
376 self
377 }
378
379 fn respond_to(self, package: Package<Self::NodeId, Self::Message>) -> Self {
380 self.to(package.header.from)
381 }
382
383 fn build(self) -> Result<Package<Self::NodeId, Self::Message>, ()> {
384 Ok(Package {
385 header: Header {
386 from: self.from.ok_or(())?,
387 to: self.to.ok_or(())?,
388 },
389 body: self.body.ok_or(())?,
390 })
391 }
392}
393
394#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
395pub struct InstallSnapshotArgs<T: LogData> {
396 pub term: Term,
397 pub leader_id: SystemNodeId,
398 pub last_included_index: LogIndex,
399 pub last_included_term: Term,
400 pub data: LogEntry<T>,
401}
402
403#[derive(Eq, PartialEq, Debug, Copy, Clone, Serialize, Deserialize)]
404pub struct ReadRequestReplyArgs<T> {
405 pub success: bool,
406 pub data: Option<T>,
407 pub redirect: Option<SystemNodeId>,
408}
409
410impl<T> ReadRequestReplyArgs<T> {
411 pub fn fail(leader_id: Option<SystemNodeId>) -> Self {
412 Self {
413 success: false,
414 data: None,
415 redirect: leader_id,
416 }
417 }
418}
419
420#[derive(Eq, PartialEq, Debug, Copy, Clone, Serialize, Deserialize)]
421pub struct RequestVoteResponseResult {
422 pub term: Term,
423 pub granted: bool,
424}
425
426#[derive(Eq, PartialEq, Debug, Copy, Clone, Serialize, Deserialize)]
427pub struct AppendLogResponseResult {
428 pub term: Term,
429 pub success: bool,
430}
431
432#[derive(Eq, PartialEq, Debug, Copy, Clone, Serialize, Deserialize)]
433pub struct WriteRequestArgs {
434 pub measurement: Option<Measurement>,
435 pub rule: Option<Rule>,
436 pub id: SystemNodeId,
437}
438
439impl WriteRequestArgs {
440 pub fn with_rule(id: SystemNodeId, rule: Rule) -> Self {
441 Self {
442 measurement: None,
443 rule: Some(rule),
444 id
445 }
446 }
447
448 pub fn with_measurement(id: SystemNodeId, measurement: Measurement) -> Self {
449 Self {
450 measurement: Some(measurement),
451 rule: None,
452 id
453 }
454 }
455}
456
457#[cfg(test)]
458mod log_tests {
459 use crate::messages::{UpdateClusterVec, Log, LogEntry, Term, LOG_LEN};
460 use crate::server::{Merge, NoOp};
461 use dcs::nodes::SystemNodeId;
462 use dcs::rules::measurements::{Measurement, SystemState};
463 use serde::{Serialize, Serializer};
464 use std::fmt::Debug;
465 use dcs::rules::strategy::{Rule, RuleType};
466
467 #[derive(Eq, PartialEq, Copy, Clone, Debug, Serialize)]
468 struct TestLogData(usize);
469
470 impl NoOp for TestLogData {
471 fn noop() -> Self {
472 Self(0)
473 }
474 }
475
476 impl Merge for TestLogData {
477 fn merge(self, rhs: Self) -> Self {
478 Self(self.0 + rhs.0)
479 }
480 }
481
482 impl From<(SystemNodeId, Measurement)> for TestLogData {
483 fn from(value: (SystemNodeId, Measurement)) -> Self {
484 unreachable!()
485 }
486 }
487
488 impl Into<SystemState> for TestLogData {
489 fn into(self) -> SystemState {
490 unreachable!()
491 }
492 }
493
494 #[test]
495 pub fn new_log_is_empty() {
496 let log = Log::<TestLogData>::new();
497 assert!(log.is_empty())
498 }
499
500 #[test]
501 pub fn can_insert() {
502 let mut log = Log::<TestLogData>::new();
503 let data = some_entry();
504 log.push(data);
505
506 assert!(!log.is_empty())
507 }
508
509 #[test]
510 #[should_panic]
511 pub fn zero_is_not_valid_index_when_getting() {
512 let mut log = Log::<TestLogData>::new();
513 log.push(some_entry()).expect("Failed push");
514
515 let result = log.get(0);
516 }
517
518 #[test]
519 #[should_panic]
520 pub fn zero_is_not_valid_index_when_inserting() {
521 let mut log = Log::<TestLogData>::new();
522 log.insert(1, some_entry());
523
524 let result = log.get(0);
525 }
526
527 #[test]
528 pub fn one_is_the_first_index() {
529 let mut log = Log::<TestLogData>::new();
530 log.push(some_entry()).expect("Failed push");
531
532 let result = log.get(1).cloned();
533 assert_eq!(Some(some_entry()), result);
534 }
535
536 #[test]
537 pub fn when_inserting_in_position_greater_than_length_then_is_the_same_as_pushing() {
538 let mut log = Log::<TestLogData>::new();
539 log.insert(32, some_entry());
540
541 assert!(log.get(32).is_none());
542 assert!(log.get(1).is_some());
543 }
544
545 #[test]
546 pub fn when_inserting_in_existing_position_replaces_data() {
547 let mut log = Log::<TestLogData>::new();
548 log.push(entry(1));
549 log.push(entry(2));
550 log.push(entry(3));
551
552 log.insert(2, entry(42));
553
554 assert_eq!(&entry(3), log.get(3).unwrap());
555 assert_eq!(&entry(42), log.get(2).unwrap());
556 assert_eq!(&entry(1), log.get(1).unwrap());
557 }
558
559 #[test]
560 #[should_panic]
561 pub fn zero_is_not_valid_index_for_snapshot() {
562 let mut log = Log::<TestLogData>::new();
563 log.snapshot(0);
564 }
565
566 #[test]
567 pub fn can_calculate_log_capacity() {
568 let mut log = Log::<TestLogData>::new();
569 log.push(entry(1));
570 log.push(entry(2));
571 log.push(entry(3));
572
573 assert!((1.0 - (3.0 / LOG_LEN as f32) - log.capacity()).abs() < 0.0001);
574 }
575
576 #[test]
577 pub fn snapshot_empty_log_leaves_everything_untouched() {
578 let mut log = Log::<TestLogData>::new();
579 log.snapshot(1);
580
581 log.push(entry(1));
582 log.push(entry(2));
583 log.push(entry(3));
584
585 assert_eq!(&entry(3), log.get(3).unwrap());
586 assert_eq!(&entry(2), log.get(2).unwrap());
587 assert_eq!(&entry(1), log.get(1).unwrap());
588 }
589
590 #[test]
591 pub fn snapshot_log_with_one_element_leaves_everything_untouched() {
592 let mut log = Log::<TestLogData>::new();
593 log.push(entry(1));
594
595 log.snapshot(1);
596
597 log.push(entry(2));
598 log.push(entry(3));
599 assert_eq!(&entry(3), log.get(3).unwrap());
600 assert_eq!(&entry(2), log.get(2).unwrap());
601 assert_eq!(&entry(1), log.get(1).unwrap());
602 }
603
604 #[test]
605 pub fn can_get_value_not_snapshotted() {
606 let mut log = Log::<TestLogData>::new();
607 log.push(entry(1));
608 log.push(entry(2));
609 log.push(entry(3));
610
611 log.snapshot(2);
612
613 assert_eq!(&entry(3), log.get(3).unwrap());
614 }
615
616 #[test]
617 pub fn all_snapshotted_values_are_the_same() {
618 let mut log = Log::<TestLogData>::new();
619 log.push(entry(1));
620 log.push(entry(2));
621 log.push(entry(3));
622
623 log.snapshot(3);
624
625 assert_eq!(log.get(1).unwrap(), log.get(2).unwrap());
626 assert_eq!(log.get(2).unwrap(), log.get(3).unwrap());
627 }
628
629 #[test]
630 pub fn can_check_if_entry_was_snapshotted() {
631 let mut log = Log::<TestLogData>::new();
632 log.push(entry(1));
633 log.push(entry(2));
634 log.push(entry(3));
635
636 log.snapshot(2);
637
638 assert!(log.is_snapshot(2));
639 assert!(!log.is_snapshot(3));
640 }
641
642 #[test]
643 pub fn can_get_last_included_index() {
644 let mut log = Log::<TestLogData>::new();
645 log.push(entry(1));
646 log.push(entry(2));
647 log.push(entry(3));
648
649 log.snapshot(2);
650
651 assert_eq!(2, log.last_included_index());
652 }
653
654 #[test]
655 pub fn can_get_last_included_index_after_multiple_snapshots() {
656 let mut log = Log::<TestLogData>::new();
657 log.push(entry(1));
658 log.push(entry(2));
659 log.push(entry(3));
660
661 log.snapshot(3);
662
663 log.push(entry(4));
664 log.push(entry(5));
665 log.push(entry(6));
666
667 log.snapshot(6);
668
669 assert_eq!(6, log.last_included_index());
670 assert_eq!(&entry(21), log.get(6).unwrap());
671 }
672
673 #[test]
674 pub fn snapshot_is_the_merge_of_entries_up_to_idx() {
675 let mut log = Log::<TestLogData>::new();
676 log.push(entry(1));
677 log.push(entry(2));
678 log.push(entry(3));
679
680 log.snapshot(3);
681
682 assert_eq!(&entry(6), log.get(1).unwrap());
683 }
684
685 #[test]
686 pub fn different_indices_generate_different_snapshot_data() {
687 let mut log = Log::<TestLogData>::new();
688 log.push(entry(1));
689 log.push(entry(2));
690 log.push(entry(3));
691
692 let mut log2 = log.clone();
693 log.snapshot(3);
694 log2.snapshot(2);
695
696 assert_ne!(log.get(1), log2.get(1));
697 assert_eq!(&entry(6), log.get(1).unwrap());
698 assert_eq!(&entry(3), log2.get(1).unwrap());
699 }
700
701 #[test]
702 pub fn the_snapshot_term_is_the_greater_term_of_all_entries() {
703 let mut log = Log::<TestLogData>::new();
704 log.push(entry_with_term(0, 0));
705 log.push(entry_with_term(1, 0));
706 log.push(entry_with_term(2, 0));
707
708 log.snapshot(3);
709
710 assert_eq!(&entry_with_term(2, 0), log.get(1).unwrap());
711 }
712
713 #[test]
714 pub fn different_indices_generate_different_snapshot_term() {
715 let mut log = Log::<TestLogData>::new();
716 log.push(entry_with_term(0, 1));
717 log.push(entry_with_term(1, 2));
718 log.push(entry_with_term(2, 3));
719
720 let mut log2 = log.clone();
721 log.snapshot(3);
722 log2.snapshot(2);
723
724 assert_eq!(&entry_with_term(2, 6), log.get(1).unwrap());
725 assert_eq!(&entry_with_term(1, 3), log2.get(1).unwrap());
726 }
727
728 #[test]
729 pub fn the_snapshot_change_request_is_the_latest_change_req() {
730 let mut log = Log::<TestLogData>::new();
731 log.push(config_change(&[1]));
732 log.push(config_change(&[1, 2]));
733 log.push(config_change(&[1, 2, 3]));
734
735 log.snapshot(3);
736
737 assert_eq!(&config_change(&[1, 2, 3]), log.get(1).unwrap());
738 }
739
740 fn update_cluster_vector(slice: Vec<u32>) -> Option<UpdateClusterVec> {
741 let mut cluster = UpdateClusterVec::new();
742 for elem in slice {
743 cluster.push(SystemNodeId::from(elem));
744 }
745 Some(cluster)
746 }
747
748 #[test]
749 pub fn entries_with_data_config_change_and_rule() {
750 let mut log = Log::<TestLogData>::new();
751 log.push(LogEntry::new(0, Some(TestLogData::noop()), update_cluster_vector(vec!(1, 2)), Some(rule(1))));
752 log.push(LogEntry::new(1, Some(TestLogData(2)), None, None));
753 log.push(LogEntry::new(2, None, update_cluster_vector(vec!(1, 2, 3)), Some(rule(2))));
754 log.push(LogEntry::new(3, None, None, None));
755
756 log.snapshot(4);
757
758 let expected_snapshot = LogEntry::new(
759 3,
760 Some(TestLogData(2)),
761 update_cluster_vector(vec!(1, 2, 3)),
762 Some(rule(2)),
763 );
764 assert_eq!(&expected_snapshot, log.get(1).unwrap());
765 }
766
767 fn config_change(config: &[u32]) -> LogEntry<TestLogData> {
768 LogEntry::with_config(0, update_cluster_vector(config.to_vec()))
769 }
770
771 #[test]
772 pub fn snapshot_several_times() {
773 let mut log = Log::<TestLogData>::new();
774 log.push(LogEntry::new(1, Some(TestLogData::noop()), update_cluster_vector(vec!(1, 2)), None));
775 log.push(LogEntry::new(2, Some(TestLogData(2)), None, None));
776 log.push(LogEntry::new(3, None, update_cluster_vector(vec!(1, 2, 3)), None));
777 log.push(LogEntry::new(4, None, None, None));
778
779 log.snapshot(4);
780
781 log.push(LogEntry::new(5, Some(TestLogData(3)), None, None));
782 log.push(LogEntry::new(6, None, update_cluster_vector(vec!(1, 2, 3, 4)), None));
783 log.push(LogEntry::new(7, None, None, None));
784 log.push(LogEntry::new(8, Some(TestLogData(5)), None, None));
785
786 log.snapshot(7);
787
788 let expected_data = Some(TestLogData(5));
789 let expected_config = update_cluster_vector(vec!(1, 2, 3, 4));
790 let expected_snapshot = LogEntry::new(7, expected_data, expected_config, None);
791 assert_eq!(&expected_snapshot, log.get(1).unwrap());
792 assert_eq!(
793 &LogEntry::new(8, Some(TestLogData(5)), None, None),
794 log.get(8).unwrap()
795 );
796 }
797
798 #[test]
799 pub fn snapshot_log_increases_capacity() {
800 let mut log = Log::<TestLogData>::new();
801 log.push(entry(1));
802 log.push(entry(2));
803 log.push(entry(3));
804
805 let initial_capacity = log.capacity();
806
807 log.snapshot(2);
808
809 let final_capacity = log.capacity();
810 assert!(initial_capacity < final_capacity);
811 }
812
813 #[test]
814 pub fn more_elements_in_snapshot_means_more_capacity_gained() {
815 let mut log = Log::<TestLogData>::new();
816 log.push(entry(1));
817 log.push(entry(2));
818 log.push(entry(3));
819
820 let mut log2 = log.clone();
821
822 log.snapshot(2);
823 log2.snapshot(3);
824
825 assert!(log.capacity() < log2.capacity());
826 }
827
828 #[test]
829 pub fn if_no_snapshot_then_last_included_term_is_zero() {
830 let mut log = Log::<TestLogData>::new();
831 log.push(LogEntry::with_data(1, Some(TestLogData(1))));
832 log.push(LogEntry::with_data(1, Some(TestLogData(2))));
833 log.push(LogEntry::with_data(1, Some(TestLogData(3))));
834
835 assert_eq!(0, log.last_included_term());
836 }
837
838 #[test]
839 pub fn if_snapshot_then_last_included_term_is_given() {
840 let mut log = Log::<TestLogData>::new();
841 log.push(LogEntry::with_data(1, Some(TestLogData(1))));
842 log.push(LogEntry::with_data(2, Some(TestLogData(2))));
843 log.push(LogEntry::with_data(3, Some(TestLogData(3))));
844
845 log.snapshot(3);
846
847 assert_eq!(3, log.last_included_term());
848 }
849
850 #[test]
851 pub fn install_snapshot() {
852 let mut log = Log::<TestLogData>::new();
853 log.push(LogEntry::with_data(1, Some(TestLogData(1))));
854 log.push(LogEntry::with_data(2, Some(TestLogData(2))));
855 log.push(LogEntry::with_data(3, Some(TestLogData(3))));
856
857 log.install_snapshot(2, LogEntry::with_data(2, Some(TestLogData(3))));
858
859 assert_eq!(2, log.len());
860 assert!(log.is_snapshot(1));
861 assert!(log.is_snapshot(2));
862 assert!(!log.is_snapshot(3));
863 }
864
865 #[test]
866 pub fn append_after_install_snapshot() {
867 let mut log = Log::<TestLogData>::new();
868 log.push(LogEntry::with_data(1, Some(TestLogData(1))));
869 log.push(LogEntry::with_data(2, Some(TestLogData(2))));
870 log.push(LogEntry::with_data(3, Some(TestLogData(3))));
871 log.install_snapshot(2, LogEntry::with_data(2, Some(TestLogData(3))));
872
873 log.push(LogEntry::with_data(4, Some(TestLogData(4))));
874
875 assert_eq!(
876 &LogEntry::with_data(4, Some(TestLogData(4))),
877 log.get(4).unwrap()
878 )
879 }
880
881 #[test]
882 pub fn multiple_install_snapshot() {
883 let mut log = Log::<TestLogData>::new();
884 log.push(LogEntry::with_data(1, Some(TestLogData(1))));
885 log.push(LogEntry::with_data(2, Some(TestLogData(2))));
886 log.push(LogEntry::with_data(3, Some(TestLogData(3))));
887
888 log.install_snapshot(2, LogEntry::with_data(2, Some(TestLogData(3))));
889
890 log.push(LogEntry::with_data(4, Some(TestLogData(4))));
891 log.push(LogEntry::with_data(5, Some(TestLogData(5))));
892 log.push(LogEntry::with_data(6, Some(TestLogData(6))));
893
894 log.install_snapshot(5, LogEntry::with_data(5, Some(TestLogData(15))));
895
896 assert_eq!(
897 &LogEntry::with_data(5, Some(TestLogData(15))),
898 log.get(5).unwrap()
899 );
900 assert_eq!(
901 &LogEntry::with_data(6, Some(TestLogData(6))),
902 log.get(6).unwrap()
903 )
904 }
905
906 fn some_entry() -> LogEntry<TestLogData> {
907 entry(1)
908 }
909
910 fn entry(value: usize) -> LogEntry<TestLogData> {
911 LogEntry::with_data(0, Some(TestLogData(value)))
912 }
913
914 fn rule(threshold: i32) -> Rule {
915 Rule {
916 name: RuleType::MeanOver,
917 threshold,
918 }
919 }
920
921 fn entry_with_term(term: u16, value: usize) -> LogEntry<TestLogData> {
922 LogEntry::with_data(term as Term, Some(TestLogData(value)))
923 }
924}