raft_consensus/persistent_log/
mem.rs1use std::io::{Read, Write};
2use std::result;
3
4use persistent_log::{Error, Log};
5use {LogIndex, ServerId, Term};
6
7#[derive(Clone, Debug)]
15pub struct MemLog {
16 current_term: Term,
17 voted_for: Option<ServerId>,
18 entries: Vec<(Term, Vec<u8>)>,
19}
20
21impl MemLog {
22 pub fn new() -> MemLog {
23 MemLog {
24 current_term: Term(0),
25 voted_for: None,
26 entries: Vec::new(),
27 }
28 }
29}
30
31impl Log for MemLog {
32 type Error = Error;
33
34 fn current_term(&self) -> result::Result<Term, Error> {
35 Ok(self.current_term)
36 }
37
38 fn set_current_term(&mut self, term: Term) -> result::Result<(), Error> {
39 self.voted_for = None;
40 self.current_term = term;
41 Ok(())
42 }
43
44 fn inc_current_term(&mut self) -> result::Result<Term, Error> {
45 self.voted_for = None;
46 self.current_term = self.current_term + 1;
47 self.current_term()
48 }
49
50 fn voted_for(&self) -> result::Result<Option<ServerId>, Error> {
51 Ok(self.voted_for)
52 }
53
54 fn set_voted_for(&mut self, address: ServerId) -> result::Result<(), Error> {
55 self.voted_for = Some(address);
56 Ok(())
57 }
58
59 fn latest_log_index(&self) -> result::Result<LogIndex, Error> {
60 Ok(LogIndex(self.entries.len() as u64))
61 }
62
63 fn latest_log_term(&self) -> result::Result<Term, Error> {
64 let len = self.entries.len();
65 if len == 0 {
66 Ok(Term::from(0))
67 } else {
68 Ok(self.entries[len - 1].0)
69 }
70 }
71
72 fn entry<W: Write>(&self, index: LogIndex, buf: Option<W>) -> Result<Term, Error> {
73 match self.entries.get((index - 1).as_u64() as usize) {
74 Some(&(term, ref bytes)) => {
75 if let Some(mut buf) = buf {
76 buf.write_all(&bytes)?;
77 };
78 Ok(term)
79 }
80 None => Err(Error::BadIndex),
81 }
82 }
83
84 fn append_entries<R: Read, I: Iterator<Item = (Term, R)>>(
85 &mut self,
86 from: LogIndex,
87 entries: I,
88 ) -> result::Result<(), Self::Error> {
89 if self.latest_log_index()? + 1 < from {
90 return Err(Error::BadLogIndex);
91 }
92
93 let mut entries_vec = Vec::new();
95 for (term, mut reader) in entries {
96 let mut v = Vec::new();
97 reader.read_to_end(&mut v)?;
98 entries_vec.push((term, v));
99 }
100 self.entries.truncate((from - 1).as_u64() as usize);
101 self.entries.extend(entries_vec.into_iter());
102 Ok(())
103 }
104}
105
106#[cfg(test)]
107mod test {
108
109 use super::*;
110
111 use persistent_log::{append_entries, get_entry, Log};
112 use {LogIndex, ServerId, Term};
113
114 #[test]
115 fn test_current_term() {
116 let mut store = MemLog::new();
117 assert_eq!(Term(0), store.current_term().unwrap());
118 store.set_voted_for(ServerId::from(0)).unwrap();
119 store.set_current_term(Term(42)).unwrap();
120 assert_eq!(None, store.voted_for().unwrap());
121 assert_eq!(Term(42), store.current_term().unwrap());
122 store.inc_current_term().unwrap();
123 assert_eq!(Term(43), store.current_term().unwrap());
124 }
125
126 #[test]
127 fn test_voted_for() {
128 let mut store = MemLog::new();
129 assert_eq!(None, store.voted_for().unwrap());
130 let id = ServerId::from(0);
131 store.set_voted_for(id).unwrap();
132 assert_eq!(Some(id), store.voted_for().unwrap());
133 }
134
135 #[test]
136 fn test_append_entries() {
137 let mut store = MemLog::new();
138 assert_eq!(LogIndex::from(0), store.latest_log_index().unwrap());
139 assert_eq!(Term::from(0), store.latest_log_term().unwrap());
140
141 append_entries(
143 &mut store,
144 LogIndex(1),
145 &[
146 (Term::from(0), &[1]),
147 (Term::from(0), &[2]),
148 (Term::from(0), &[3]),
149 (Term::from(1), &[4]),
150 ],
151 ).unwrap();
152 assert_eq!(LogIndex::from(4), store.latest_log_index().unwrap());
153 assert_eq!(Term::from(1), store.latest_log_term().unwrap());
154
155 assert_eq!(
156 (Term::from(0), vec![1u8]),
157 get_entry(&store, LogIndex::from(1))
158 );
159 assert_eq!(
160 (Term::from(0), vec![2u8]),
161 get_entry(&store, LogIndex::from(2))
162 );
163 assert_eq!(
164 (Term::from(0), vec![3u8]),
165 get_entry(&store, LogIndex::from(3))
166 );
167 assert_eq!(
168 (Term::from(1), vec![4u8]),
169 get_entry(&store, LogIndex::from(4))
170 );
171
172 append_entries(&mut store, LogIndex::from(4), &[]).unwrap();
174 assert_eq!(LogIndex(3), store.latest_log_index().unwrap());
175 assert_eq!(Term::from(0), store.latest_log_term().unwrap());
176 assert_eq!(
177 (Term::from(0), vec![1u8]),
178 get_entry(&store, LogIndex::from(1))
179 );
180 assert_eq!(
181 (Term::from(0), vec![2u8]),
182 get_entry(&store, LogIndex::from(2))
183 );
184 assert_eq!(
185 (Term::from(0), vec![3u8]),
186 get_entry(&store, LogIndex::from(3))
187 );
188
189 append_entries(
191 &mut store,
192 LogIndex::from(3),
193 &[(Term(2), &[3]), (Term(3), &[4])],
194 ).unwrap();
195 assert_eq!(LogIndex(4), store.latest_log_index().unwrap());
196 assert_eq!(Term::from(3), store.latest_log_term().unwrap());
197 assert_eq!(
198 (Term::from(0), vec![1u8]),
199 get_entry(&store, LogIndex::from(1))
200 );
201 assert_eq!(
202 (Term::from(0), vec![2u8]),
203 get_entry(&store, LogIndex::from(2))
204 );
205 assert_eq!(
206 (Term::from(2), vec![3u8]),
207 get_entry(&store, LogIndex::from(3))
208 );
209 assert_eq!(
210 (Term::from(3), vec![4u8]),
211 get_entry(&store, LogIndex::from(4))
212 );
213 }
214}
215
216impl Default for MemLog {
217 fn default() -> Self {
218 MemLog::new()
219 }
220}