1use std::cmp;
24use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
25
26use crate::eraftpb::*;
27
28use crate::errors::{Error, Result, StorageError};
29use crate::util::limit_size;
30
31use getset::{Getters, Setters};
32
33#[derive(Debug, Clone, Default, Getters, Setters)]
36pub struct RaftState {
37 pub hard_state: HardState,
39
40 pub conf_state: ConfState,
43}
44
45impl RaftState {
46 pub fn new(hard_state: HardState, conf_state: ConfState) -> RaftState {
48 RaftState {
49 hard_state,
50 conf_state,
51 }
52 }
53 pub fn initialized(&self) -> bool {
55 self.conf_state != ConfState::default()
56 }
57}
58
59#[derive(Debug)]
61pub struct GetEntriesContext(pub(crate) GetEntriesFor);
62
63impl GetEntriesContext {
64 pub fn empty(can_async: bool) -> Self {
66 GetEntriesContext(GetEntriesFor::Empty(can_async))
67 }
68
69 pub fn can_async(&self) -> bool {
71 match self.0 {
72 GetEntriesFor::SendAppend { .. } => true,
73 GetEntriesFor::Empty(can_async) => can_async,
74 _ => false,
75 }
76 }
77}
78
79#[derive(Debug)]
80pub(crate) enum GetEntriesFor {
81 SendAppend {
83 to: u64,
85 term: u64,
87 aggressively: bool,
89 },
90 GenReady,
92 TransferLeader,
94 CommitByVote,
96 Empty(bool),
98}
99
100pub trait Storage {
107 fn initial_state(&self) -> Result<RaftState>;
113
114 fn entries(
129 &self,
130 low: u64,
131 high: u64,
132 max_size: impl Into<Option<u64>>,
133 context: GetEntriesContext,
134 ) -> Result<Vec<Entry>>;
135
136 fn term(&self, idx: u64) -> Result<u64>;
141
142 fn first_index(&self) -> Result<u64>;
148
149 fn last_index(&self) -> Result<u64>;
151
152 fn snapshot(&self, request_index: u64, to: u64) -> Result<Snapshot>;
160}
161
162#[derive(Default)]
165pub struct MemStorageCore {
166 raft_state: RaftState,
167 entries: Vec<Entry>,
169 snapshot_metadata: SnapshotMetadata,
171 trigger_snap_unavailable: bool,
174 trigger_log_unavailable: bool,
176 get_entries_context: Option<GetEntriesContext>,
178}
179
180impl MemStorageCore {
181 pub fn set_hardstate(&mut self, hs: HardState) {
183 self.raft_state.hard_state = hs;
184 }
185
186 pub fn hard_state(&self) -> &HardState {
188 &self.raft_state.hard_state
189 }
190
191 pub fn mut_hard_state(&mut self) -> &mut HardState {
193 &mut self.raft_state.hard_state
194 }
195
196 pub fn commit_to(&mut self, index: u64) -> Result<()> {
202 assert!(
203 self.has_entry_at(index),
204 "commit_to {} but the entry does not exist",
205 index
206 );
207
208 let diff = (index - self.entries[0].index) as usize;
209 self.raft_state.hard_state.commit = index;
210 self.raft_state.hard_state.term = self.entries[diff].term;
211 Ok(())
212 }
213
214 pub fn set_conf_state(&mut self, cs: ConfState) {
216 self.raft_state.conf_state = cs;
217 }
218
219 #[inline]
220 fn has_entry_at(&self, index: u64) -> bool {
221 !self.entries.is_empty() && index >= self.first_index() && index <= self.last_index()
222 }
223
224 fn first_index(&self) -> u64 {
225 match self.entries.first() {
226 Some(e) => e.index,
227 None => self.snapshot_metadata.index + 1,
228 }
229 }
230
231 fn last_index(&self) -> u64 {
232 match self.entries.last() {
233 Some(e) => e.index,
234 None => self.snapshot_metadata.index,
235 }
236 }
237
238 pub fn apply_snapshot(&mut self, mut snapshot: Snapshot) -> Result<()> {
244 let mut meta = snapshot.take_metadata();
245 let index = meta.index;
246
247 if self.first_index() > index {
248 return Err(Error::Store(StorageError::SnapshotOutOfDate));
249 }
250
251 self.snapshot_metadata = meta.clone();
252
253 self.raft_state.hard_state.term = cmp::max(self.raft_state.hard_state.term, meta.term);
254 self.raft_state.hard_state.commit = index;
255 self.entries.clear();
256
257 self.raft_state.conf_state = meta.take_conf_state();
259 Ok(())
260 }
261
262 fn snapshot(&self) -> Snapshot {
263 let mut snapshot = Snapshot::default();
264
265 let meta = snapshot.mut_metadata();
269 meta.index = self.raft_state.hard_state.commit;
270 meta.term = match meta.index.cmp(&self.snapshot_metadata.index) {
271 cmp::Ordering::Equal => self.snapshot_metadata.term,
272 cmp::Ordering::Greater => {
273 let offset = self.entries[0].index;
274 self.entries[(meta.index - offset) as usize].term
275 }
276 cmp::Ordering::Less => {
277 panic!(
278 "commit {} < snapshot_metadata.index {}",
279 meta.index, self.snapshot_metadata.index
280 );
281 }
282 };
283
284 meta.set_conf_state(self.raft_state.conf_state.clone());
285 snapshot
286 }
287
288 pub fn compact(&mut self, compact_index: u64) -> Result<()> {
296 if compact_index <= self.first_index() {
297 return Ok(());
299 }
300
301 if compact_index > self.last_index() + 1 {
302 panic!(
303 "compact not received raft logs: {}, last index: {}",
304 compact_index,
305 self.last_index()
306 );
307 }
308
309 if let Some(entry) = self.entries.first() {
310 let offset = compact_index - entry.index;
311 self.entries.drain(..offset as usize);
312 }
313 Ok(())
314 }
315
316 pub fn append(&mut self, ents: &[Entry]) -> Result<()> {
323 if ents.is_empty() {
324 return Ok(());
325 }
326 if self.first_index() > ents[0].index {
327 panic!(
328 "overwrite compacted raft logs, compacted: {}, append: {}",
329 self.first_index() - 1,
330 ents[0].index,
331 );
332 }
333 if self.last_index() + 1 < ents[0].index {
334 panic!(
335 "raft logs should be continuous, last index: {}, new appended: {}",
336 self.last_index(),
337 ents[0].index,
338 );
339 }
340
341 let diff = ents[0].index - self.first_index();
343 self.entries.drain(diff as usize..);
344 self.entries.extend_from_slice(ents);
345 Ok(())
346 }
347
348 pub fn commit_to_and_set_conf_states(&mut self, idx: u64, cs: Option<ConfState>) -> Result<()> {
350 self.commit_to(idx)?;
351 if let Some(cs) = cs {
352 self.raft_state.conf_state = cs;
353 }
354 Ok(())
355 }
356
357 pub fn trigger_snap_unavailable(&mut self) {
359 self.trigger_snap_unavailable = true;
360 }
361
362 pub fn trigger_log_unavailable(&mut self, v: bool) {
364 self.trigger_log_unavailable = v;
365 }
366
367 pub fn take_get_entries_context(&mut self) -> Option<GetEntriesContext> {
369 self.get_entries_context.take()
370 }
371}
372
373#[derive(Clone, Default)]
381pub struct MemStorage {
382 core: Arc<RwLock<MemStorageCore>>,
383}
384
385impl MemStorage {
386 pub fn new() -> MemStorage {
388 MemStorage {
389 ..Default::default()
390 }
391 }
392
393 pub fn new_with_conf_state<T>(conf_state: T) -> MemStorage
398 where
399 ConfState: From<T>,
400 {
401 let store = MemStorage::new();
402 store.initialize_with_conf_state(conf_state);
403 store
404 }
405
406 pub fn initialize_with_conf_state<T>(&self, conf_state: T)
410 where
411 ConfState: From<T>,
412 {
413 assert!(!self.initial_state().unwrap().initialized());
414 let mut core = self.wl();
415 core.raft_state.conf_state = ConfState::from(conf_state);
422 }
423
424 pub fn rl(&self) -> RwLockReadGuard<'_, MemStorageCore> {
427 self.core.read().unwrap()
428 }
429
430 pub fn wl(&self) -> RwLockWriteGuard<'_, MemStorageCore> {
433 self.core.write().unwrap()
434 }
435}
436
437impl Storage for MemStorage {
438 fn initial_state(&self) -> Result<RaftState> {
440 Ok(self.rl().raft_state.clone())
441 }
442
443 fn entries(
445 &self,
446 low: u64,
447 high: u64,
448 max_size: impl Into<Option<u64>>,
449 context: GetEntriesContext,
450 ) -> Result<Vec<Entry>> {
451 let max_size = max_size.into();
452 let mut core = self.wl();
453 if low < core.first_index() {
454 return Err(Error::Store(StorageError::Compacted));
455 }
456
457 if high > core.last_index() + 1 {
458 panic!(
459 "index out of bound (last: {}, high: {})",
460 core.last_index() + 1,
461 high
462 );
463 }
464
465 if core.trigger_log_unavailable && context.can_async() {
466 core.get_entries_context = Some(context);
467 return Err(Error::Store(StorageError::LogTemporarilyUnavailable));
468 }
469
470 let offset = core.entries[0].index;
471 let lo = (low - offset) as usize;
472 let hi = (high - offset) as usize;
473 let mut ents = core.entries[lo..hi].to_vec();
474 limit_size(&mut ents, max_size);
475 Ok(ents)
476 }
477
478 fn term(&self, idx: u64) -> Result<u64> {
480 let core = self.rl();
481 if idx == core.snapshot_metadata.index {
482 return Ok(core.snapshot_metadata.term);
483 }
484
485 let offset = core.first_index();
486 if idx < offset {
487 return Err(Error::Store(StorageError::Compacted));
488 }
489
490 if idx > core.last_index() {
491 return Err(Error::Store(StorageError::Unavailable));
492 }
493 Ok(core.entries[(idx - offset) as usize].term)
494 }
495
496 fn first_index(&self) -> Result<u64> {
498 Ok(self.rl().first_index())
499 }
500
501 fn last_index(&self) -> Result<u64> {
503 Ok(self.rl().last_index())
504 }
505
506 fn snapshot(&self, request_index: u64, _to: u64) -> Result<Snapshot> {
508 let mut core = self.wl();
509 if core.trigger_snap_unavailable {
510 core.trigger_snap_unavailable = false;
511 Err(Error::Store(StorageError::SnapshotTemporarilyUnavailable))
512 } else {
513 let mut snap = core.snapshot();
514 if snap.get_metadata().index < request_index {
515 snap.mut_metadata().index = request_index;
516 }
517 Ok(snap)
518 }
519 }
520}
521
522#[cfg(test)]
523mod test {
524 use std::panic::{self, AssertUnwindSafe};
525
526 use protobuf::Message as PbMessage;
527
528 use crate::eraftpb::{ConfState, Entry, Snapshot};
529 use crate::errors::{Error as RaftError, StorageError};
530
531 use super::{GetEntriesContext, MemStorage, Storage};
532
533 fn new_entry(index: u64, term: u64) -> Entry {
534 let mut e = Entry::default();
535 e.term = term;
536 e.index = index;
537 e
538 }
539
540 fn size_of<T: PbMessage>(m: &T) -> u32 {
541 m.compute_size()
542 }
543
544 fn new_snapshot(index: u64, term: u64, voters: Vec<u64>) -> Snapshot {
545 let mut s = Snapshot::default();
546 s.mut_metadata().index = index;
547 s.mut_metadata().term = term;
548 s.mut_metadata().mut_conf_state().voters = voters;
549 s
550 }
551
552 #[test]
553 fn test_storage_term() {
554 let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
555 let mut tests = vec![
556 (2, Err(RaftError::Store(StorageError::Compacted))),
557 (3, Ok(3)),
558 (4, Ok(4)),
559 (5, Ok(5)),
560 (6, Err(RaftError::Store(StorageError::Unavailable))),
561 ];
562
563 for (i, (idx, wterm)) in tests.drain(..).enumerate() {
564 let storage = MemStorage::new();
565 storage.wl().entries = ents.clone();
566
567 let t = storage.term(idx);
568 if t != wterm {
569 panic!("#{}: expect res {:?}, got {:?}", i, wterm, t);
570 }
571 }
572 }
573
574 #[test]
575 fn test_storage_entries() {
576 let ents = vec![
577 new_entry(3, 3),
578 new_entry(4, 4),
579 new_entry(5, 5),
580 new_entry(6, 6),
581 ];
582 let max_u64 = u64::max_value();
583 let mut tests = vec![
584 (
585 2,
586 6,
587 max_u64,
588 Err(RaftError::Store(StorageError::Compacted)),
589 ),
590 (3, 4, max_u64, Ok(vec![new_entry(3, 3)])),
591 (4, 5, max_u64, Ok(vec![new_entry(4, 4)])),
592 (4, 6, max_u64, Ok(vec![new_entry(4, 4), new_entry(5, 5)])),
593 (
594 4,
595 7,
596 max_u64,
597 Ok(vec![new_entry(4, 4), new_entry(5, 5), new_entry(6, 6)]),
598 ),
599 (4, 7, 0, Ok(vec![new_entry(4, 4)])),
601 (
603 4,
604 7,
605 u64::from(size_of(&ents[1]) + size_of(&ents[2])),
606 Ok(vec![new_entry(4, 4), new_entry(5, 5)]),
607 ),
608 (
609 4,
610 7,
611 u64::from(size_of(&ents[1]) + size_of(&ents[2]) + size_of(&ents[3]) / 2),
612 Ok(vec![new_entry(4, 4), new_entry(5, 5)]),
613 ),
614 (
615 4,
616 7,
617 u64::from(size_of(&ents[1]) + size_of(&ents[2]) + size_of(&ents[3]) - 1),
618 Ok(vec![new_entry(4, 4), new_entry(5, 5)]),
619 ),
620 (
622 4,
623 7,
624 u64::from(size_of(&ents[1]) + size_of(&ents[2]) + size_of(&ents[3])),
625 Ok(vec![new_entry(4, 4), new_entry(5, 5), new_entry(6, 6)]),
626 ),
627 ];
628 for (i, (lo, hi, maxsize, wentries)) in tests.drain(..).enumerate() {
629 let storage = MemStorage::new();
630 storage.wl().entries = ents.clone();
631 let e = storage.entries(lo, hi, maxsize, GetEntriesContext::empty(false));
632 if e != wentries {
633 panic!("#{}: expect entries {:?}, got {:?}", i, wentries, e);
634 }
635 }
636 }
637
638 #[test]
639 fn test_storage_last_index() {
640 let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
641 let storage = MemStorage::new();
642 storage.wl().entries = ents;
643
644 let wresult = Ok(5);
645 let result = storage.last_index();
646 if result != wresult {
647 panic!("want {:?}, got {:?}", wresult, result);
648 }
649
650 storage.wl().append(&[new_entry(6, 5)]).unwrap();
651 let wresult = Ok(6);
652 let result = storage.last_index();
653 if result != wresult {
654 panic!("want {:?}, got {:?}", wresult, result);
655 }
656 }
657
658 #[test]
659 fn test_storage_first_index() {
660 let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
661 let storage = MemStorage::new();
662 storage.wl().entries = ents;
663
664 assert_eq!(storage.first_index(), Ok(3));
665 storage.wl().compact(4).unwrap();
666 assert_eq!(storage.first_index(), Ok(4));
667 }
668
669 #[test]
670 fn test_storage_compact() {
671 let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
672 let mut tests = vec![(2, 3, 3, 3), (3, 3, 3, 3), (4, 4, 4, 2), (5, 5, 5, 1)];
673 for (i, (idx, windex, wterm, wlen)) in tests.drain(..).enumerate() {
674 let storage = MemStorage::new();
675 storage.wl().entries = ents.clone();
676
677 storage.wl().compact(idx).unwrap();
678 let index = storage.first_index().unwrap();
679 if index != windex {
680 panic!("#{}: want {}, index {}", i, windex, index);
681 }
682 let term = if let Ok(v) =
683 storage.entries(index, index + 1, 1, GetEntriesContext::empty(false))
684 {
685 v.first().map_or(0, |e| e.term)
686 } else {
687 0
688 };
689 if term != wterm {
690 panic!("#{}: want {}, term {}", i, wterm, term);
691 }
692 let last = storage.last_index().unwrap();
693 let len = storage
694 .entries(index, last + 1, 100, GetEntriesContext::empty(false))
695 .unwrap()
696 .len();
697 if len != wlen {
698 panic!("#{}: want {}, term {}", i, wlen, len);
699 }
700 }
701 }
702
703 #[test]
704 fn test_storage_create_snapshot() {
705 let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
706 let nodes = vec![1, 2, 3];
707 let mut conf_state = ConfState::default();
708 conf_state.voters = nodes.clone();
709
710 let unavailable = Err(RaftError::Store(
711 StorageError::SnapshotTemporarilyUnavailable,
712 ));
713 let mut tests = vec![
714 (4, Ok(new_snapshot(4, 4, nodes.clone())), 0),
715 (5, Ok(new_snapshot(5, 5, nodes.clone())), 5),
716 (5, Ok(new_snapshot(6, 5, nodes)), 6),
717 (5, unavailable, 6),
718 ];
719 for (i, (idx, wresult, windex)) in tests.drain(..).enumerate() {
720 let storage = MemStorage::new();
721 storage.wl().entries = ents.clone();
722 storage.wl().raft_state.hard_state.commit = idx;
723 storage.wl().raft_state.hard_state.term = idx;
724 storage.wl().raft_state.conf_state = conf_state.clone();
725
726 if wresult.is_err() {
727 storage.wl().trigger_snap_unavailable();
728 }
729
730 let result = storage.snapshot(windex, 0);
731 if result != wresult {
732 panic!("#{}: want {:?}, got {:?}", i, wresult, result);
733 }
734 }
735 }
736
737 #[test]
738 fn test_storage_append() {
739 let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
740 let mut tests = vec![
741 (
742 vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)],
743 Some(vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)]),
744 ),
745 (
746 vec![new_entry(3, 3), new_entry(4, 6), new_entry(5, 6)],
747 Some(vec![new_entry(3, 3), new_entry(4, 6), new_entry(5, 6)]),
748 ),
749 (
750 vec![
751 new_entry(3, 3),
752 new_entry(4, 4),
753 new_entry(5, 5),
754 new_entry(6, 5),
755 ],
756 Some(vec![
757 new_entry(3, 3),
758 new_entry(4, 4),
759 new_entry(5, 5),
760 new_entry(6, 5),
761 ]),
762 ),
763 (
765 vec![new_entry(2, 3), new_entry(3, 3), new_entry(4, 5)],
766 None,
767 ),
768 (
770 vec![new_entry(4, 5)],
771 Some(vec![new_entry(3, 3), new_entry(4, 5)]),
772 ),
773 (
775 vec![new_entry(6, 6)],
776 Some(vec![
777 new_entry(3, 3),
778 new_entry(4, 4),
779 new_entry(5, 5),
780 new_entry(6, 6),
781 ]),
782 ),
783 ];
784 for (i, (entries, wentries)) in tests.drain(..).enumerate() {
785 let storage = MemStorage::new();
786 storage.wl().entries = ents.clone();
787 let res = panic::catch_unwind(AssertUnwindSafe(|| storage.wl().append(&entries)));
788 if let Some(wentries) = wentries {
789 let _ = res.unwrap();
790 let e = &storage.wl().entries;
791 if *e != wentries {
792 panic!("#{}: want {:?}, entries {:?}", i, wentries, e);
793 }
794 } else {
795 res.unwrap_err();
796 }
797 }
798 }
799
800 #[test]
801 fn test_storage_apply_snapshot() {
802 let nodes = vec![1, 2, 3];
803 let storage = MemStorage::new();
804
805 let snap = new_snapshot(4, 4, nodes.clone());
807 storage.wl().apply_snapshot(snap).unwrap();
808
809 let snap = new_snapshot(3, 3, nodes);
811 storage.wl().apply_snapshot(snap).unwrap_err();
812 }
813}