raft/
storage.rs

1//! Represents the storage trait and example implementation.
2//!
3//! The storage trait is used to house and eventually serialize the state of the system.
4//! Custom implementations of this are normal and this is likely to be a key integration
5//! point for your distributed storage.
6
7// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
8
9// Copyright 2015 The etcd Authors
10//
11// Licensed under the Apache License, Version 2.0 (the "License");
12// you may not use this file except in compliance with the License.
13// You may obtain a copy of the License at
14//
15//     http://www.apache.org/licenses/LICENSE-2.0
16//
17// Unless required by applicable law or agreed to in writing, software
18// distributed under the License is distributed on an "AS IS" BASIS,
19// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20// See the License for the specific language governing permissions and
21// limitations under the License.
22
23use std::cmp;
24use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
25
26use crate::eraftpb::*;
27
28use crate::errors::{Error, Result, StorageError};
29use crate::util::limit_size;
30
31use getset::{Getters, Setters};
32
33/// Holds both the hard state (commit index, vote leader, term) and the configuration state
34/// (Current node IDs)
35#[derive(Debug, Clone, Default, Getters, Setters)]
36pub struct RaftState {
37    /// Contains the last meta information including commit index, the vote leader, and the vote term.
38    pub hard_state: HardState,
39
40    /// Records the current node IDs like `[1, 2, 3]` in the cluster. Every Raft node must have a
41    /// unique ID in the cluster;
42    pub conf_state: ConfState,
43}
44
45impl RaftState {
46    /// Create a new RaftState.
47    pub fn new(hard_state: HardState, conf_state: ConfState) -> RaftState {
48        RaftState {
49            hard_state,
50            conf_state,
51        }
52    }
53    /// Indicates the `RaftState` is initialized or not.
54    pub fn initialized(&self) -> bool {
55        self.conf_state != ConfState::default()
56    }
57}
58
59/// Records the context of the caller who calls entries() of Storage trait.
60#[derive(Debug)]
61pub struct GetEntriesContext(pub(crate) GetEntriesFor);
62
63impl GetEntriesContext {
64    /// Used for callers out of raft. Caller can customize if it supports async.
65    pub fn empty(can_async: bool) -> Self {
66        GetEntriesContext(GetEntriesFor::Empty(can_async))
67    }
68
69    /// Check if the caller's context support fetching entries asynchrouously.
70    pub fn can_async(&self) -> bool {
71        match self.0 {
72            GetEntriesFor::SendAppend { .. } => true,
73            GetEntriesFor::Empty(can_async) => can_async,
74            _ => false,
75        }
76    }
77}
78
79#[derive(Debug)]
80pub(crate) enum GetEntriesFor {
81    // for sending entries to followers
82    SendAppend {
83        /// the peer id to which the entries are going to send
84        to: u64,
85        /// the term when the request is issued
86        term: u64,
87        /// whether to exhaust all the entries
88        aggressively: bool,
89    },
90    // for getting committed entries in a ready
91    GenReady,
92    // for getting entries to check pending conf when transferring leader
93    TransferLeader,
94    // for getting entries to check pending conf when forwarding commit index by vote messages
95    CommitByVote,
96    // It's not called by the raft itself
97    Empty(bool),
98}
99
100/// Storage saves all the information about the current Raft implementation, including Raft Log,
101/// commit index, the leader to vote for, etc.
102///
103/// If any Storage method returns an error, the raft instance will
104/// become inoperable and refuse to participate in elections; the
105/// application is responsible for cleanup and recovery in this case.
106pub trait Storage {
107    /// `initial_state` is called when Raft is initialized. This interface will return a `RaftState`
108    /// which contains `HardState` and `ConfState`.
109    ///
110    /// `RaftState` could be initialized or not. If it's initialized it means the `Storage` is
111    /// created with a configuration, and its last index and term should be greater than 0.
112    fn initial_state(&self) -> Result<RaftState>;
113
114    /// Returns a slice of log entries in the range `[low, high)`.
115    /// max_size limits the total size of the log entries returned if not `None`, however
116    /// the slice of entries returned will always have length at least 1 if entries are
117    /// found in the range.
118    ///
119    /// Entries are supported to be fetched asynchorously depending on the context. Async is optional.
120    /// Storage should check context.can_async() first and decide whether to fetch entries asynchorously
121    /// based on its own implementation. If the entries are fetched asynchorously, storage should return
122    /// LogTemporarilyUnavailable, and application needs to call `on_entries_fetched(context)` to trigger
123    /// re-fetch of the entries after the storage finishes fetching the entries.   
124    ///
125    /// # Panics
126    ///
127    /// Panics if `high` is higher than `Storage::last_index(&self) + 1`.
128    fn entries(
129        &self,
130        low: u64,
131        high: u64,
132        max_size: impl Into<Option<u64>>,
133        context: GetEntriesContext,
134    ) -> Result<Vec<Entry>>;
135
136    /// Returns the term of entry idx, which must be in the range
137    /// [first_index()-1, last_index()]. The term of the entry before
138    /// first_index is retained for matching purpose even though the
139    /// rest of that entry may not be available.
140    fn term(&self, idx: u64) -> Result<u64>;
141
142    /// Returns the index of the first log entry that is possible available via entries, which will
143    /// always equal to `truncated index` plus 1.
144    ///
145    /// New created (but not initialized) `Storage` can be considered as truncated at 0 so that 1
146    /// will be returned in this case.
147    fn first_index(&self) -> Result<u64>;
148
149    /// The index of the last entry replicated in the `Storage`.
150    fn last_index(&self) -> Result<u64>;
151
152    /// Returns the most recent snapshot.
153    ///
154    /// If snapshot is temporarily unavailable, it should return SnapshotTemporarilyUnavailable,
155    /// so raft state machine could know that Storage needs some time to prepare
156    /// snapshot and call snapshot later.
157    /// A snapshot's index must not less than the `request_index`.
158    /// `to` indicates which peer is requesting the snapshot.
159    fn snapshot(&self, request_index: u64, to: u64) -> Result<Snapshot>;
160}
161
162/// The Memory Storage Core instance holds the actual state of the storage struct. To access this
163/// value, use the `rl` and `wl` functions on the main MemStorage implementation.
164#[derive(Default)]
165pub struct MemStorageCore {
166    raft_state: RaftState,
167    // entries[i] has raft log position i+snapshot.get_metadata().index
168    entries: Vec<Entry>,
169    // Metadata of the last snapshot received.
170    snapshot_metadata: SnapshotMetadata,
171    // If it is true, the next snapshot will return a
172    // SnapshotTemporarilyUnavailable error.
173    trigger_snap_unavailable: bool,
174    // Peers that are fetching entries asynchronously.
175    trigger_log_unavailable: bool,
176    // Stores get entries context.
177    get_entries_context: Option<GetEntriesContext>,
178}
179
180impl MemStorageCore {
181    /// Saves the current HardState.
182    pub fn set_hardstate(&mut self, hs: HardState) {
183        self.raft_state.hard_state = hs;
184    }
185
186    /// Get the hard state.
187    pub fn hard_state(&self) -> &HardState {
188        &self.raft_state.hard_state
189    }
190
191    /// Get the mut hard state.
192    pub fn mut_hard_state(&mut self) -> &mut HardState {
193        &mut self.raft_state.hard_state
194    }
195
196    /// Commit to an index.
197    ///
198    /// # Panics
199    ///
200    /// Panics if there is no such entry in raft logs.
201    pub fn commit_to(&mut self, index: u64) -> Result<()> {
202        assert!(
203            self.has_entry_at(index),
204            "commit_to {} but the entry does not exist",
205            index
206        );
207
208        let diff = (index - self.entries[0].index) as usize;
209        self.raft_state.hard_state.commit = index;
210        self.raft_state.hard_state.term = self.entries[diff].term;
211        Ok(())
212    }
213
214    /// Saves the current conf state.
215    pub fn set_conf_state(&mut self, cs: ConfState) {
216        self.raft_state.conf_state = cs;
217    }
218
219    #[inline]
220    fn has_entry_at(&self, index: u64) -> bool {
221        !self.entries.is_empty() && index >= self.first_index() && index <= self.last_index()
222    }
223
224    fn first_index(&self) -> u64 {
225        match self.entries.first() {
226            Some(e) => e.index,
227            None => self.snapshot_metadata.index + 1,
228        }
229    }
230
231    fn last_index(&self) -> u64 {
232        match self.entries.last() {
233            Some(e) => e.index,
234            None => self.snapshot_metadata.index,
235        }
236    }
237
238    /// Overwrites the contents of this Storage object with those of the given snapshot.
239    ///
240    /// # Panics
241    ///
242    /// Panics if the snapshot index is less than the storage's first index.
243    pub fn apply_snapshot(&mut self, mut snapshot: Snapshot) -> Result<()> {
244        let mut meta = snapshot.take_metadata();
245        let index = meta.index;
246
247        if self.first_index() > index {
248            return Err(Error::Store(StorageError::SnapshotOutOfDate));
249        }
250
251        self.snapshot_metadata = meta.clone();
252
253        self.raft_state.hard_state.term = cmp::max(self.raft_state.hard_state.term, meta.term);
254        self.raft_state.hard_state.commit = index;
255        self.entries.clear();
256
257        // Update conf states.
258        self.raft_state.conf_state = meta.take_conf_state();
259        Ok(())
260    }
261
262    fn snapshot(&self) -> Snapshot {
263        let mut snapshot = Snapshot::default();
264
265        // We assume all entries whose indexes are less than `hard_state.commit`
266        // have been applied, so use the latest commit index to construct the snapshot.
267        // TODO: This is not true for async ready.
268        let meta = snapshot.mut_metadata();
269        meta.index = self.raft_state.hard_state.commit;
270        meta.term = match meta.index.cmp(&self.snapshot_metadata.index) {
271            cmp::Ordering::Equal => self.snapshot_metadata.term,
272            cmp::Ordering::Greater => {
273                let offset = self.entries[0].index;
274                self.entries[(meta.index - offset) as usize].term
275            }
276            cmp::Ordering::Less => {
277                panic!(
278                    "commit {} < snapshot_metadata.index {}",
279                    meta.index, self.snapshot_metadata.index
280                );
281            }
282        };
283
284        meta.set_conf_state(self.raft_state.conf_state.clone());
285        snapshot
286    }
287
288    /// Discards all log entries prior to compact_index.
289    /// It is the application's responsibility to not attempt to compact an index
290    /// greater than RaftLog.applied.
291    ///
292    /// # Panics
293    ///
294    /// Panics if `compact_index` is higher than `Storage::last_index(&self) + 1`.
295    pub fn compact(&mut self, compact_index: u64) -> Result<()> {
296        if compact_index <= self.first_index() {
297            // Don't need to treat this case as an error.
298            return Ok(());
299        }
300
301        if compact_index > self.last_index() + 1 {
302            panic!(
303                "compact not received raft logs: {}, last index: {}",
304                compact_index,
305                self.last_index()
306            );
307        }
308
309        if let Some(entry) = self.entries.first() {
310            let offset = compact_index - entry.index;
311            self.entries.drain(..offset as usize);
312        }
313        Ok(())
314    }
315
316    /// Append the new entries to storage.
317    ///
318    /// # Panics
319    ///
320    /// Panics if `ents` contains compacted entries, or there's a gap between `ents` and the last
321    /// received entry in the storage.
322    pub fn append(&mut self, ents: &[Entry]) -> Result<()> {
323        if ents.is_empty() {
324            return Ok(());
325        }
326        if self.first_index() > ents[0].index {
327            panic!(
328                "overwrite compacted raft logs, compacted: {}, append: {}",
329                self.first_index() - 1,
330                ents[0].index,
331            );
332        }
333        if self.last_index() + 1 < ents[0].index {
334            panic!(
335                "raft logs should be continuous, last index: {}, new appended: {}",
336                self.last_index(),
337                ents[0].index,
338            );
339        }
340
341        // Remove all entries overwritten by `ents`.
342        let diff = ents[0].index - self.first_index();
343        self.entries.drain(diff as usize..);
344        self.entries.extend_from_slice(ents);
345        Ok(())
346    }
347
348    /// Commit to `idx` and set configuration to the given states. Only used for tests.
349    pub fn commit_to_and_set_conf_states(&mut self, idx: u64, cs: Option<ConfState>) -> Result<()> {
350        self.commit_to(idx)?;
351        if let Some(cs) = cs {
352            self.raft_state.conf_state = cs;
353        }
354        Ok(())
355    }
356
357    /// Trigger a SnapshotTemporarilyUnavailable error.
358    pub fn trigger_snap_unavailable(&mut self) {
359        self.trigger_snap_unavailable = true;
360    }
361
362    /// Set a LogTemporarilyUnavailable error.
363    pub fn trigger_log_unavailable(&mut self, v: bool) {
364        self.trigger_log_unavailable = v;
365    }
366
367    /// Take get entries context.
368    pub fn take_get_entries_context(&mut self) -> Option<GetEntriesContext> {
369        self.get_entries_context.take()
370    }
371}
372
373/// `MemStorage` is a thread-safe but incomplete implementation of `Storage`, mainly for tests.
374///
375/// A real `Storage` should save both raft logs and applied data. However `MemStorage` only
376/// contains raft logs. So you can call `MemStorage::append` to persist new received unstable raft
377/// logs and then access them with `Storage` APIs. The only exception is `Storage::snapshot`. There
378/// is no data in `Snapshot` returned by `MemStorage::snapshot` because applied data is not stored
379/// in `MemStorage`.
380#[derive(Clone, Default)]
381pub struct MemStorage {
382    core: Arc<RwLock<MemStorageCore>>,
383}
384
385impl MemStorage {
386    /// Returns a new memory storage value.
387    pub fn new() -> MemStorage {
388        MemStorage {
389            ..Default::default()
390        }
391    }
392
393    /// Create a new `MemStorage` with a given `Config`. The given `Config` will be used to
394    /// initialize the storage.
395    ///
396    /// You should use the same input to initialize all nodes.
397    pub fn new_with_conf_state<T>(conf_state: T) -> MemStorage
398    where
399        ConfState: From<T>,
400    {
401        let store = MemStorage::new();
402        store.initialize_with_conf_state(conf_state);
403        store
404    }
405
406    /// Initialize a `MemStorage` with a given `Config`.
407    ///
408    /// You should use the same input to initialize all nodes.
409    pub fn initialize_with_conf_state<T>(&self, conf_state: T)
410    where
411        ConfState: From<T>,
412    {
413        assert!(!self.initial_state().unwrap().initialized());
414        let mut core = self.wl();
415        // Setting initial state is very important to build a correct raft, as raft algorithm
416        // itself only guarantees logs consistency. Typically, you need to ensure either all start
417        // states are the same on all nodes, or new nodes always catch up logs by snapshot first.
418        //
419        // In practice, we choose the second way by assigning non-zero index to first index. Here
420        // we choose the first way for historical reason and easier to write tests.
421        core.raft_state.conf_state = ConfState::from(conf_state);
422    }
423
424    /// Opens up a read lock on the storage and returns a guard handle. Use this
425    /// with functions that don't require mutation.
426    pub fn rl(&self) -> RwLockReadGuard<'_, MemStorageCore> {
427        self.core.read().unwrap()
428    }
429
430    /// Opens up a write lock on the storage and returns guard handle. Use this
431    /// with functions that take a mutable reference to self.
432    pub fn wl(&self) -> RwLockWriteGuard<'_, MemStorageCore> {
433        self.core.write().unwrap()
434    }
435}
436
437impl Storage for MemStorage {
438    /// Implements the Storage trait.
439    fn initial_state(&self) -> Result<RaftState> {
440        Ok(self.rl().raft_state.clone())
441    }
442
443    /// Implements the Storage trait.
444    fn entries(
445        &self,
446        low: u64,
447        high: u64,
448        max_size: impl Into<Option<u64>>,
449        context: GetEntriesContext,
450    ) -> Result<Vec<Entry>> {
451        let max_size = max_size.into();
452        let mut core = self.wl();
453        if low < core.first_index() {
454            return Err(Error::Store(StorageError::Compacted));
455        }
456
457        if high > core.last_index() + 1 {
458            panic!(
459                "index out of bound (last: {}, high: {})",
460                core.last_index() + 1,
461                high
462            );
463        }
464
465        if core.trigger_log_unavailable && context.can_async() {
466            core.get_entries_context = Some(context);
467            return Err(Error::Store(StorageError::LogTemporarilyUnavailable));
468        }
469
470        let offset = core.entries[0].index;
471        let lo = (low - offset) as usize;
472        let hi = (high - offset) as usize;
473        let mut ents = core.entries[lo..hi].to_vec();
474        limit_size(&mut ents, max_size);
475        Ok(ents)
476    }
477
478    /// Implements the Storage trait.
479    fn term(&self, idx: u64) -> Result<u64> {
480        let core = self.rl();
481        if idx == core.snapshot_metadata.index {
482            return Ok(core.snapshot_metadata.term);
483        }
484
485        let offset = core.first_index();
486        if idx < offset {
487            return Err(Error::Store(StorageError::Compacted));
488        }
489
490        if idx > core.last_index() {
491            return Err(Error::Store(StorageError::Unavailable));
492        }
493        Ok(core.entries[(idx - offset) as usize].term)
494    }
495
496    /// Implements the Storage trait.
497    fn first_index(&self) -> Result<u64> {
498        Ok(self.rl().first_index())
499    }
500
501    /// Implements the Storage trait.
502    fn last_index(&self) -> Result<u64> {
503        Ok(self.rl().last_index())
504    }
505
506    /// Implements the Storage trait.
507    fn snapshot(&self, request_index: u64, _to: u64) -> Result<Snapshot> {
508        let mut core = self.wl();
509        if core.trigger_snap_unavailable {
510            core.trigger_snap_unavailable = false;
511            Err(Error::Store(StorageError::SnapshotTemporarilyUnavailable))
512        } else {
513            let mut snap = core.snapshot();
514            if snap.get_metadata().index < request_index {
515                snap.mut_metadata().index = request_index;
516            }
517            Ok(snap)
518        }
519    }
520}
521
522#[cfg(test)]
523mod test {
524    use std::panic::{self, AssertUnwindSafe};
525
526    use protobuf::Message as PbMessage;
527
528    use crate::eraftpb::{ConfState, Entry, Snapshot};
529    use crate::errors::{Error as RaftError, StorageError};
530
531    use super::{GetEntriesContext, MemStorage, Storage};
532
533    fn new_entry(index: u64, term: u64) -> Entry {
534        let mut e = Entry::default();
535        e.term = term;
536        e.index = index;
537        e
538    }
539
540    fn size_of<T: PbMessage>(m: &T) -> u32 {
541        m.compute_size()
542    }
543
544    fn new_snapshot(index: u64, term: u64, voters: Vec<u64>) -> Snapshot {
545        let mut s = Snapshot::default();
546        s.mut_metadata().index = index;
547        s.mut_metadata().term = term;
548        s.mut_metadata().mut_conf_state().voters = voters;
549        s
550    }
551
552    #[test]
553    fn test_storage_term() {
554        let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
555        let mut tests = vec![
556            (2, Err(RaftError::Store(StorageError::Compacted))),
557            (3, Ok(3)),
558            (4, Ok(4)),
559            (5, Ok(5)),
560            (6, Err(RaftError::Store(StorageError::Unavailable))),
561        ];
562
563        for (i, (idx, wterm)) in tests.drain(..).enumerate() {
564            let storage = MemStorage::new();
565            storage.wl().entries = ents.clone();
566
567            let t = storage.term(idx);
568            if t != wterm {
569                panic!("#{}: expect res {:?}, got {:?}", i, wterm, t);
570            }
571        }
572    }
573
574    #[test]
575    fn test_storage_entries() {
576        let ents = vec![
577            new_entry(3, 3),
578            new_entry(4, 4),
579            new_entry(5, 5),
580            new_entry(6, 6),
581        ];
582        let max_u64 = u64::max_value();
583        let mut tests = vec![
584            (
585                2,
586                6,
587                max_u64,
588                Err(RaftError::Store(StorageError::Compacted)),
589            ),
590            (3, 4, max_u64, Ok(vec![new_entry(3, 3)])),
591            (4, 5, max_u64, Ok(vec![new_entry(4, 4)])),
592            (4, 6, max_u64, Ok(vec![new_entry(4, 4), new_entry(5, 5)])),
593            (
594                4,
595                7,
596                max_u64,
597                Ok(vec![new_entry(4, 4), new_entry(5, 5), new_entry(6, 6)]),
598            ),
599            // even if maxsize is zero, the first entry should be returned
600            (4, 7, 0, Ok(vec![new_entry(4, 4)])),
601            // limit to 2
602            (
603                4,
604                7,
605                u64::from(size_of(&ents[1]) + size_of(&ents[2])),
606                Ok(vec![new_entry(4, 4), new_entry(5, 5)]),
607            ),
608            (
609                4,
610                7,
611                u64::from(size_of(&ents[1]) + size_of(&ents[2]) + size_of(&ents[3]) / 2),
612                Ok(vec![new_entry(4, 4), new_entry(5, 5)]),
613            ),
614            (
615                4,
616                7,
617                u64::from(size_of(&ents[1]) + size_of(&ents[2]) + size_of(&ents[3]) - 1),
618                Ok(vec![new_entry(4, 4), new_entry(5, 5)]),
619            ),
620            // all
621            (
622                4,
623                7,
624                u64::from(size_of(&ents[1]) + size_of(&ents[2]) + size_of(&ents[3])),
625                Ok(vec![new_entry(4, 4), new_entry(5, 5), new_entry(6, 6)]),
626            ),
627        ];
628        for (i, (lo, hi, maxsize, wentries)) in tests.drain(..).enumerate() {
629            let storage = MemStorage::new();
630            storage.wl().entries = ents.clone();
631            let e = storage.entries(lo, hi, maxsize, GetEntriesContext::empty(false));
632            if e != wentries {
633                panic!("#{}: expect entries {:?}, got {:?}", i, wentries, e);
634            }
635        }
636    }
637
638    #[test]
639    fn test_storage_last_index() {
640        let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
641        let storage = MemStorage::new();
642        storage.wl().entries = ents;
643
644        let wresult = Ok(5);
645        let result = storage.last_index();
646        if result != wresult {
647            panic!("want {:?}, got {:?}", wresult, result);
648        }
649
650        storage.wl().append(&[new_entry(6, 5)]).unwrap();
651        let wresult = Ok(6);
652        let result = storage.last_index();
653        if result != wresult {
654            panic!("want {:?}, got {:?}", wresult, result);
655        }
656    }
657
658    #[test]
659    fn test_storage_first_index() {
660        let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
661        let storage = MemStorage::new();
662        storage.wl().entries = ents;
663
664        assert_eq!(storage.first_index(), Ok(3));
665        storage.wl().compact(4).unwrap();
666        assert_eq!(storage.first_index(), Ok(4));
667    }
668
669    #[test]
670    fn test_storage_compact() {
671        let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
672        let mut tests = vec![(2, 3, 3, 3), (3, 3, 3, 3), (4, 4, 4, 2), (5, 5, 5, 1)];
673        for (i, (idx, windex, wterm, wlen)) in tests.drain(..).enumerate() {
674            let storage = MemStorage::new();
675            storage.wl().entries = ents.clone();
676
677            storage.wl().compact(idx).unwrap();
678            let index = storage.first_index().unwrap();
679            if index != windex {
680                panic!("#{}: want {}, index {}", i, windex, index);
681            }
682            let term = if let Ok(v) =
683                storage.entries(index, index + 1, 1, GetEntriesContext::empty(false))
684            {
685                v.first().map_or(0, |e| e.term)
686            } else {
687                0
688            };
689            if term != wterm {
690                panic!("#{}: want {}, term {}", i, wterm, term);
691            }
692            let last = storage.last_index().unwrap();
693            let len = storage
694                .entries(index, last + 1, 100, GetEntriesContext::empty(false))
695                .unwrap()
696                .len();
697            if len != wlen {
698                panic!("#{}: want {}, term {}", i, wlen, len);
699            }
700        }
701    }
702
703    #[test]
704    fn test_storage_create_snapshot() {
705        let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
706        let nodes = vec![1, 2, 3];
707        let mut conf_state = ConfState::default();
708        conf_state.voters = nodes.clone();
709
710        let unavailable = Err(RaftError::Store(
711            StorageError::SnapshotTemporarilyUnavailable,
712        ));
713        let mut tests = vec![
714            (4, Ok(new_snapshot(4, 4, nodes.clone())), 0),
715            (5, Ok(new_snapshot(5, 5, nodes.clone())), 5),
716            (5, Ok(new_snapshot(6, 5, nodes)), 6),
717            (5, unavailable, 6),
718        ];
719        for (i, (idx, wresult, windex)) in tests.drain(..).enumerate() {
720            let storage = MemStorage::new();
721            storage.wl().entries = ents.clone();
722            storage.wl().raft_state.hard_state.commit = idx;
723            storage.wl().raft_state.hard_state.term = idx;
724            storage.wl().raft_state.conf_state = conf_state.clone();
725
726            if wresult.is_err() {
727                storage.wl().trigger_snap_unavailable();
728            }
729
730            let result = storage.snapshot(windex, 0);
731            if result != wresult {
732                panic!("#{}: want {:?}, got {:?}", i, wresult, result);
733            }
734        }
735    }
736
737    #[test]
738    fn test_storage_append() {
739        let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
740        let mut tests = vec![
741            (
742                vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)],
743                Some(vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)]),
744            ),
745            (
746                vec![new_entry(3, 3), new_entry(4, 6), new_entry(5, 6)],
747                Some(vec![new_entry(3, 3), new_entry(4, 6), new_entry(5, 6)]),
748            ),
749            (
750                vec![
751                    new_entry(3, 3),
752                    new_entry(4, 4),
753                    new_entry(5, 5),
754                    new_entry(6, 5),
755                ],
756                Some(vec![
757                    new_entry(3, 3),
758                    new_entry(4, 4),
759                    new_entry(5, 5),
760                    new_entry(6, 5),
761                ]),
762            ),
763            // overwrite compacted raft logs is not allowed
764            (
765                vec![new_entry(2, 3), new_entry(3, 3), new_entry(4, 5)],
766                None,
767            ),
768            // truncate the existing entries and append
769            (
770                vec![new_entry(4, 5)],
771                Some(vec![new_entry(3, 3), new_entry(4, 5)]),
772            ),
773            // direct append
774            (
775                vec![new_entry(6, 6)],
776                Some(vec![
777                    new_entry(3, 3),
778                    new_entry(4, 4),
779                    new_entry(5, 5),
780                    new_entry(6, 6),
781                ]),
782            ),
783        ];
784        for (i, (entries, wentries)) in tests.drain(..).enumerate() {
785            let storage = MemStorage::new();
786            storage.wl().entries = ents.clone();
787            let res = panic::catch_unwind(AssertUnwindSafe(|| storage.wl().append(&entries)));
788            if let Some(wentries) = wentries {
789                let _ = res.unwrap();
790                let e = &storage.wl().entries;
791                if *e != wentries {
792                    panic!("#{}: want {:?}, entries {:?}", i, wentries, e);
793                }
794            } else {
795                res.unwrap_err();
796            }
797        }
798    }
799
800    #[test]
801    fn test_storage_apply_snapshot() {
802        let nodes = vec![1, 2, 3];
803        let storage = MemStorage::new();
804
805        // Apply snapshot successfully
806        let snap = new_snapshot(4, 4, nodes.clone());
807        storage.wl().apply_snapshot(snap).unwrap();
808
809        // Apply snapshot fails due to StorageError::SnapshotOutOfDate
810        let snap = new_snapshot(3, 3, nodes);
811        storage.wl().apply_snapshot(snap).unwrap_err();
812    }
813}