raft_consensus/persistent_log/
mem.rs

1use std::io::{Read, Write};
2use std::result;
3
4use persistent_log::{Error, Log};
5use {LogIndex, ServerId, Term};
6
7/// This is a `Log` implementation that stores entries in a simple in-memory vector. Other data
8/// is stored in a struct. It is chiefly intended for testing.
9///
10/// # Panic
11///
12/// No bounds checking is performed and attempted access to non-existing log
13/// indexes will panic.
14#[derive(Clone, Debug)]
15pub struct MemLog {
16    current_term: Term,
17    voted_for: Option<ServerId>,
18    entries: Vec<(Term, Vec<u8>)>,
19}
20
21impl MemLog {
22    pub fn new() -> MemLog {
23        MemLog {
24            current_term: Term(0),
25            voted_for: None,
26            entries: Vec::new(),
27        }
28    }
29}
30
31impl Log for MemLog {
32    type Error = Error;
33
34    fn current_term(&self) -> result::Result<Term, Error> {
35        Ok(self.current_term)
36    }
37
38    fn set_current_term(&mut self, term: Term) -> result::Result<(), Error> {
39        self.voted_for = None;
40        self.current_term = term;
41        Ok(())
42    }
43
44    fn inc_current_term(&mut self) -> result::Result<Term, Error> {
45        self.voted_for = None;
46        self.current_term = self.current_term + 1;
47        self.current_term()
48    }
49
50    fn voted_for(&self) -> result::Result<Option<ServerId>, Error> {
51        Ok(self.voted_for)
52    }
53
54    fn set_voted_for(&mut self, address: ServerId) -> result::Result<(), Error> {
55        self.voted_for = Some(address);
56        Ok(())
57    }
58
59    fn latest_log_index(&self) -> result::Result<LogIndex, Error> {
60        Ok(LogIndex(self.entries.len() as u64))
61    }
62
63    fn latest_log_term(&self) -> result::Result<Term, Error> {
64        let len = self.entries.len();
65        if len == 0 {
66            Ok(Term::from(0))
67        } else {
68            Ok(self.entries[len - 1].0)
69        }
70    }
71
72    fn entry<W: Write>(&self, index: LogIndex, buf: Option<W>) -> Result<Term, Error> {
73        match self.entries.get((index - 1).as_u64() as usize) {
74            Some(&(term, ref bytes)) => {
75                if let Some(mut buf) = buf {
76                    buf.write_all(&bytes)?;
77                };
78                Ok(term)
79            }
80            None => Err(Error::BadIndex),
81        }
82    }
83
84    fn append_entries<R: Read, I: Iterator<Item = (Term, R)>>(
85        &mut self,
86        from: LogIndex,
87        entries: I,
88    ) -> result::Result<(), Self::Error> {
89        if self.latest_log_index()? + 1 < from {
90            return Err(Error::BadLogIndex);
91        }
92
93        // TODO remove vector hack
94        let mut entries_vec = Vec::new();
95        for (term, mut reader) in entries {
96            let mut v = Vec::new();
97            reader.read_to_end(&mut v)?;
98            entries_vec.push((term, v));
99        }
100        self.entries.truncate((from - 1).as_u64() as usize);
101        self.entries.extend(entries_vec.into_iter());
102        Ok(())
103    }
104}
105
106#[cfg(test)]
107mod test {
108
109    use super::*;
110
111    use persistent_log::{append_entries, get_entry, Log};
112    use {LogIndex, ServerId, Term};
113
114    #[test]
115    fn test_current_term() {
116        let mut store = MemLog::new();
117        assert_eq!(Term(0), store.current_term().unwrap());
118        store.set_voted_for(ServerId::from(0)).unwrap();
119        store.set_current_term(Term(42)).unwrap();
120        assert_eq!(None, store.voted_for().unwrap());
121        assert_eq!(Term(42), store.current_term().unwrap());
122        store.inc_current_term().unwrap();
123        assert_eq!(Term(43), store.current_term().unwrap());
124    }
125
126    #[test]
127    fn test_voted_for() {
128        let mut store = MemLog::new();
129        assert_eq!(None, store.voted_for().unwrap());
130        let id = ServerId::from(0);
131        store.set_voted_for(id).unwrap();
132        assert_eq!(Some(id), store.voted_for().unwrap());
133    }
134
135    #[test]
136    fn test_append_entries() {
137        let mut store = MemLog::new();
138        assert_eq!(LogIndex::from(0), store.latest_log_index().unwrap());
139        assert_eq!(Term::from(0), store.latest_log_term().unwrap());
140
141        // [0.1, 0.2, 0.3, 1.4]
142        append_entries(
143            &mut store,
144            LogIndex(1),
145            &[
146                (Term::from(0), &[1]),
147                (Term::from(0), &[2]),
148                (Term::from(0), &[3]),
149                (Term::from(1), &[4]),
150            ],
151        ).unwrap();
152        assert_eq!(LogIndex::from(4), store.latest_log_index().unwrap());
153        assert_eq!(Term::from(1), store.latest_log_term().unwrap());
154
155        assert_eq!(
156            (Term::from(0), vec![1u8]),
157            get_entry(&store, LogIndex::from(1))
158        );
159        assert_eq!(
160            (Term::from(0), vec![2u8]),
161            get_entry(&store, LogIndex::from(2))
162        );
163        assert_eq!(
164            (Term::from(0), vec![3u8]),
165            get_entry(&store, LogIndex::from(3))
166        );
167        assert_eq!(
168            (Term::from(1), vec![4u8]),
169            get_entry(&store, LogIndex::from(4))
170        );
171
172        // [0.1, 0.2, 0.3]
173        append_entries(&mut store, LogIndex::from(4), &[]).unwrap();
174        assert_eq!(LogIndex(3), store.latest_log_index().unwrap());
175        assert_eq!(Term::from(0), store.latest_log_term().unwrap());
176        assert_eq!(
177            (Term::from(0), vec![1u8]),
178            get_entry(&store, LogIndex::from(1))
179        );
180        assert_eq!(
181            (Term::from(0), vec![2u8]),
182            get_entry(&store, LogIndex::from(2))
183        );
184        assert_eq!(
185            (Term::from(0), vec![3u8]),
186            get_entry(&store, LogIndex::from(3))
187        );
188
189        // [0.1, 0.2, 2.3, 3.4]
190        append_entries(
191            &mut store,
192            LogIndex::from(3),
193            &[(Term(2), &[3]), (Term(3), &[4])],
194        ).unwrap();
195        assert_eq!(LogIndex(4), store.latest_log_index().unwrap());
196        assert_eq!(Term::from(3), store.latest_log_term().unwrap());
197        assert_eq!(
198            (Term::from(0), vec![1u8]),
199            get_entry(&store, LogIndex::from(1))
200        );
201        assert_eq!(
202            (Term::from(0), vec![2u8]),
203            get_entry(&store, LogIndex::from(2))
204        );
205        assert_eq!(
206            (Term::from(2), vec![3u8]),
207            get_entry(&store, LogIndex::from(3))
208        );
209        assert_eq!(
210            (Term::from(3), vec![4u8]),
211            get_entry(&store, LogIndex::from(4))
212        );
213    }
214}
215
216impl Default for MemLog {
217    fn default() -> Self {
218        MemLog::new()
219    }
220}