1use crate::{LogIndex, RaftError, RaftResult, Term};
10use serde::{Deserialize, Serialize};
11use std::collections::VecDeque;
12
13#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
15pub struct LogEntry {
16 pub term: Term,
18
19 pub index: LogIndex,
21
22 pub command: Vec<u8>,
24}
25
26impl LogEntry {
27 pub fn new(term: Term, index: LogIndex, command: Vec<u8>) -> Self {
29 Self {
30 term,
31 index,
32 command,
33 }
34 }
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct Snapshot {
40 pub last_included_index: LogIndex,
42
43 pub last_included_term: Term,
45
46 pub data: Vec<u8>,
48
49 pub configuration: Vec<String>,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct RaftLog {
56 entries: VecDeque<LogEntry>,
58
59 snapshot: Option<Snapshot>,
61
62 base_index: LogIndex,
64
65 base_term: Term,
67}
68
69impl RaftLog {
70 pub fn new() -> Self {
72 Self {
73 entries: VecDeque::new(),
74 snapshot: None,
75 base_index: 0,
76 base_term: 0,
77 }
78 }
79
80 pub fn last_index(&self) -> LogIndex {
82 if let Some(entry) = self.entries.back() {
83 entry.index
84 } else {
85 self.base_index
86 }
87 }
88
89 pub fn last_term(&self) -> Term {
91 if let Some(entry) = self.entries.back() {
92 entry.term
93 } else {
94 self.base_term
95 }
96 }
97
98 pub fn term_at(&self, index: LogIndex) -> Option<Term> {
100 if index == self.base_index {
101 return Some(self.base_term);
102 }
103
104 if index < self.base_index {
105 return None;
106 }
107
108 let offset = (index - self.base_index - 1) as usize;
109 self.entries.get(offset).map(|entry| entry.term)
110 }
111
112 pub fn get(&self, index: LogIndex) -> Option<&LogEntry> {
114 if index <= self.base_index {
115 return None;
116 }
117
118 let offset = (index - self.base_index - 1) as usize;
119 self.entries.get(offset)
120 }
121
122 pub fn entries_from(&self, start_index: LogIndex) -> Vec<LogEntry> {
124 if start_index <= self.base_index {
125 return self.entries.iter().cloned().collect();
126 }
127
128 let offset = (start_index - self.base_index - 1) as usize;
129 self.entries.iter().skip(offset).cloned().collect()
130 }
131
132 pub fn append(&mut self, term: Term, command: Vec<u8>) -> LogIndex {
134 let index = self.last_index() + 1;
135 let entry = LogEntry::new(term, index, command);
136 self.entries.push_back(entry);
137 index
138 }
139
140 pub fn append_entries(&mut self, entries: Vec<LogEntry>) -> RaftResult<()> {
142 for entry in entries {
143 let expected_index = self.last_index() + 1;
145 if entry.index != expected_index {
146 return Err(RaftError::LogInconsistency);
147 }
148 self.entries.push_back(entry);
149 }
150 Ok(())
151 }
152
153 pub fn truncate_from(&mut self, index: LogIndex) -> RaftResult<()> {
155 if index <= self.base_index {
156 return Err(RaftError::InvalidLogIndex(index));
157 }
158
159 let offset = (index - self.base_index - 1) as usize;
160 self.entries.truncate(offset);
161 Ok(())
162 }
163
164 pub fn matches(&self, index: LogIndex, term: Term) -> bool {
166 if index == 0 {
167 return true;
168 }
169
170 if index == self.base_index {
171 return term == self.base_term;
172 }
173
174 match self.term_at(index) {
175 Some(entry_term) => entry_term == term,
176 None => false,
177 }
178 }
179
180 pub fn install_snapshot(&mut self, snapshot: Snapshot) -> RaftResult<()> {
182 let last_index = snapshot.last_included_index;
183 let last_term = snapshot.last_included_term;
184
185 while let Some(entry) = self.entries.front() {
187 if entry.index <= last_index {
188 self.entries.pop_front();
189 } else {
190 break;
191 }
192 }
193
194 self.base_index = last_index;
195 self.base_term = last_term;
196 self.snapshot = Some(snapshot);
197
198 Ok(())
199 }
200
201 pub fn create_snapshot(
203 &mut self,
204 up_to_index: LogIndex,
205 data: Vec<u8>,
206 configuration: Vec<String>,
207 ) -> RaftResult<Snapshot> {
208 if up_to_index <= self.base_index {
209 return Err(RaftError::InvalidLogIndex(up_to_index));
210 }
211
212 let term = self
213 .term_at(up_to_index)
214 .ok_or(RaftError::InvalidLogIndex(up_to_index))?;
215
216 let snapshot = Snapshot {
217 last_included_index: up_to_index,
218 last_included_term: term,
219 data,
220 configuration,
221 };
222
223 self.install_snapshot(snapshot.clone())?;
225
226 Ok(snapshot)
227 }
228
229 pub fn snapshot(&self) -> Option<&Snapshot> {
231 self.snapshot.as_ref()
232 }
233
234 pub fn len(&self) -> usize {
236 self.entries.len()
237 }
238
239 pub fn is_empty(&self) -> bool {
241 self.entries.is_empty() && self.base_index == 0
242 }
243
244 pub fn base_index(&self) -> LogIndex {
246 self.base_index
247 }
248
249 pub fn base_term(&self) -> Term {
251 self.base_term
252 }
253}
254
255impl Default for RaftLog {
256 fn default() -> Self {
257 Self::new()
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264
265 #[test]
266 fn test_log_append() {
267 let mut log = RaftLog::new();
268 assert_eq!(log.last_index(), 0);
269
270 let idx1 = log.append(1, b"cmd1".to_vec());
271 assert_eq!(idx1, 1);
272 assert_eq!(log.last_index(), 1);
273 assert_eq!(log.last_term(), 1);
274
275 let idx2 = log.append(1, b"cmd2".to_vec());
276 assert_eq!(idx2, 2);
277 assert_eq!(log.last_index(), 2);
278 }
279
280 #[test]
281 fn test_log_get() {
282 let mut log = RaftLog::new();
283 log.append(1, b"cmd1".to_vec());
284 log.append(1, b"cmd2".to_vec());
285 log.append(2, b"cmd3".to_vec());
286
287 let entry = log.get(2).unwrap();
288 assert_eq!(entry.term, 1);
289 assert_eq!(entry.command, b"cmd2");
290
291 assert!(log.get(0).is_none());
292 assert!(log.get(10).is_none());
293 }
294
295 #[test]
296 fn test_log_truncate() {
297 let mut log = RaftLog::new();
298 log.append(1, b"cmd1".to_vec());
299 log.append(1, b"cmd2".to_vec());
300 log.append(2, b"cmd3".to_vec());
301
302 log.truncate_from(2).unwrap();
303 assert_eq!(log.last_index(), 1);
304 assert!(log.get(2).is_none());
305 }
306
307 #[test]
308 fn test_log_matches() {
309 let mut log = RaftLog::new();
310 log.append(1, b"cmd1".to_vec());
311 log.append(1, b"cmd2".to_vec());
312 log.append(2, b"cmd3".to_vec());
313
314 assert!(log.matches(1, 1));
315 assert!(log.matches(2, 1));
316 assert!(log.matches(3, 2));
317 assert!(!log.matches(3, 1));
318 assert!(!log.matches(10, 1));
319 }
320
321 #[test]
322 fn test_snapshot_creation() {
323 let mut log = RaftLog::new();
324 log.append(1, b"cmd1".to_vec());
325 log.append(1, b"cmd2".to_vec());
326 log.append(2, b"cmd3".to_vec());
327
328 let snapshot = log
329 .create_snapshot(2, b"state".to_vec(), vec!["node1".to_string()])
330 .unwrap();
331
332 assert_eq!(snapshot.last_included_index, 2);
333 assert_eq!(snapshot.last_included_term, 1);
334 assert_eq!(log.base_index(), 2);
335 assert_eq!(log.len(), 1); }
337
338 #[test]
339 fn test_entries_from() {
340 let mut log = RaftLog::new();
341 log.append(1, b"cmd1".to_vec());
342 log.append(1, b"cmd2".to_vec());
343 log.append(2, b"cmd3".to_vec());
344
345 let entries = log.entries_from(2);
346 assert_eq!(entries.len(), 2);
347 assert_eq!(entries[0].index, 2);
348 assert_eq!(entries[1].index, 3);
349 }
350}