1use crate::error::{RaftError, RaftResult};
4use crate::types::{LogIndex, Term};
5use std::collections::VecDeque;
6
7#[derive(Debug, Clone, PartialEq, Eq)]
9pub struct Command {
10 pub data: Vec<u8>,
12}
13
14impl Command {
15 pub fn new(data: Vec<u8>) -> Self {
17 Self { data }
18 }
19
20 #[allow(clippy::should_implement_trait)]
22 pub fn from_str(s: &str) -> Self {
23 Self::new(s.as_bytes().to_vec())
24 }
25}
26
27#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct LogEntry {
30 pub term: Term,
32 pub index: LogIndex,
34 pub command: Command,
36}
37
38impl LogEntry {
39 pub fn new(term: Term, index: LogIndex, command: Command) -> Self {
41 Self {
42 term,
43 index,
44 command,
45 }
46 }
47}
48
49pub struct RaftLog {
51 entries: VecDeque<LogEntry>,
53 first_index: LogIndex,
56 last_index: LogIndex,
58 last_term: Term,
60 commit_index: LogIndex,
62 applied_index: LogIndex,
64 snapshot_index: LogIndex,
66 snapshot_term: Term,
67}
68
69impl RaftLog {
70 pub fn new() -> Self {
72 Self {
73 entries: VecDeque::new(),
74 first_index: 1,
75 last_index: 0,
76 last_term: 0,
77 commit_index: 0,
78 applied_index: 0,
79 snapshot_index: 0,
80 snapshot_term: 0,
81 }
82 }
83
84 pub fn append(&mut self, term: Term, command: Command) -> LogIndex {
86 let index = self.last_index + 1;
87 let entry = LogEntry::new(term, index, command);
88
89 self.entries.push_back(entry);
90 self.last_index = index;
91 self.last_term = term;
92
93 index
94 }
95
96 pub fn append_entries(&mut self, entries: Vec<LogEntry>) -> RaftResult<()> {
98 if entries.is_empty() {
99 return Ok(());
100 }
101
102 let mut expected_index = self.last_index + 1;
104 for entry in &entries {
105 if entry.index != expected_index {
106 return Err(RaftError::LogInconsistency {
107 reason: format!("Expected index {}, got {}", expected_index, entry.index),
108 });
109 }
110 expected_index += 1;
111 }
112
113 for entry in entries {
115 self.last_index = entry.index;
116 self.last_term = entry.term;
117 self.entries.push_back(entry);
118 }
119
120 Ok(())
121 }
122
123 pub fn get(&self, index: LogIndex) -> Option<&LogEntry> {
125 if index < self.first_index || index > self.last_index {
126 return None;
127 }
128
129 let offset = (index - self.first_index) as usize;
130 self.entries.get(offset)
131 }
132
133 pub fn get_entries_from(&self, start_index: LogIndex, max_count: usize) -> Vec<LogEntry> {
135 if start_index < self.first_index || start_index > self.last_index {
136 return Vec::new();
137 }
138
139 let offset = (start_index - self.first_index) as usize;
140 self.entries
141 .iter()
142 .skip(offset)
143 .take(max_count)
144 .cloned()
145 .collect()
146 }
147
148 pub fn get_term(&self, index: LogIndex) -> Option<Term> {
150 if index == 0 {
151 return Some(0);
152 }
153
154 if index == self.snapshot_index {
155 return Some(self.snapshot_term);
156 }
157
158 self.get(index).map(|entry| entry.term)
159 }
160
161 pub fn last_index(&self) -> LogIndex {
163 self.last_index
164 }
165
166 pub fn last_term(&self) -> Term {
168 self.last_term
169 }
170
171 pub fn truncate_from(&mut self, from_index: LogIndex) -> RaftResult<()> {
173 if from_index <= self.snapshot_index {
174 return Err(RaftError::LogInconsistency {
175 reason: format!(
176 "Cannot truncate before snapshot index {}",
177 self.snapshot_index
178 ),
179 });
180 }
181
182 if from_index > self.last_index {
183 return Ok(());
184 }
185
186 let offset = (from_index - self.first_index) as usize;
188 self.entries.truncate(offset);
189
190 if let Some(last_entry) = self.entries.back() {
192 self.last_index = last_entry.index;
193 self.last_term = last_entry.term;
194 } else {
195 self.last_index = self.snapshot_index;
196 self.last_term = self.snapshot_term;
197 }
198
199 Ok(())
200 }
201
202 pub fn matches(&self, index: LogIndex, term: Term) -> bool {
204 if index == 0 {
205 return term == 0;
206 }
207
208 if index == self.snapshot_index {
209 return term == self.snapshot_term;
210 }
211
212 match self.get_term(index) {
213 Some(t) => t == term,
214 None => false,
215 }
216 }
217
218 pub fn commit_index(&self) -> LogIndex {
220 self.commit_index
221 }
222
223 pub fn set_commit_index(&mut self, index: LogIndex) -> RaftResult<()> {
225 if index < self.commit_index {
226 return Err(RaftError::LogInconsistency {
227 reason: format!(
228 "Cannot decrease commit index from {} to {}",
229 self.commit_index, index
230 ),
231 });
232 }
233
234 if index > self.last_index {
235 return Err(RaftError::LogInconsistency {
236 reason: format!(
237 "Cannot commit beyond last index {} (tried to commit {})",
238 self.last_index, index
239 ),
240 });
241 }
242
243 self.commit_index = index;
244 Ok(())
245 }
246
247 pub fn applied_index(&self) -> LogIndex {
249 self.applied_index
250 }
251
252 pub fn set_applied_index(&mut self, index: LogIndex) -> RaftResult<()> {
254 if index < self.applied_index {
255 return Err(RaftError::LogInconsistency {
256 reason: format!(
257 "Cannot decrease applied index from {} to {}",
258 self.applied_index, index
259 ),
260 });
261 }
262
263 if index > self.commit_index {
264 return Err(RaftError::LogInconsistency {
265 reason: format!(
266 "Cannot apply beyond commit index {} (tried to apply {})",
267 self.commit_index, index
268 ),
269 });
270 }
271
272 self.applied_index = index;
273 Ok(())
274 }
275
276 pub fn get_uncommitted_entries(&self) -> Vec<LogEntry> {
278 if self.applied_index >= self.commit_index {
279 return Vec::new();
280 }
281
282 self.get_entries_from(self.applied_index + 1, usize::MAX)
283 .into_iter()
284 .take_while(|entry| entry.index <= self.commit_index)
285 .collect()
286 }
287
288 pub fn is_empty(&self) -> bool {
290 self.entries.is_empty()
291 }
292
293 pub fn len(&self) -> usize {
295 self.entries.len()
296 }
297}
298
299impl Default for RaftLog {
300 fn default() -> Self {
301 Self::new()
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use super::*;
308
309 #[test]
310 fn test_new_log() {
311 let log = RaftLog::new();
312 assert_eq!(log.last_index(), 0);
313 assert_eq!(log.last_term(), 0);
314 assert_eq!(log.commit_index(), 0);
315 assert_eq!(log.applied_index(), 0);
316 assert!(log.is_empty());
317 }
318
319 #[test]
320 fn test_append_entry() {
321 let mut log = RaftLog::new();
322 let cmd = Command::from_str("test");
323
324 let index = log.append(1, cmd.clone());
325 assert_eq!(index, 1);
326 assert_eq!(log.last_index(), 1);
327 assert_eq!(log.last_term(), 1);
328 assert_eq!(log.len(), 1);
329
330 let entry = log.get(1).expect("Entry should exist");
331 assert_eq!(entry.index, 1);
332 assert_eq!(entry.term, 1);
333 assert_eq!(entry.command, cmd);
334 }
335
336 #[test]
337 fn test_append_multiple_entries() {
338 let mut log = RaftLog::new();
339 log.append(1, Command::from_str("cmd1"));
340 log.append(1, Command::from_str("cmd2"));
341 log.append(2, Command::from_str("cmd3"));
342
343 assert_eq!(log.last_index(), 3);
344 assert_eq!(log.last_term(), 2);
345 assert_eq!(log.len(), 3);
346 }
347
348 #[test]
349 fn test_get_entries_from() {
350 let mut log = RaftLog::new();
351 log.append(1, Command::from_str("cmd1"));
352 log.append(1, Command::from_str("cmd2"));
353 log.append(2, Command::from_str("cmd3"));
354
355 let entries = log.get_entries_from(2, 10);
356 assert_eq!(entries.len(), 2);
357 assert_eq!(entries[0].index, 2);
358 assert_eq!(entries[1].index, 3);
359 }
360
361 #[test]
362 fn test_get_entries_from_with_limit() {
363 let mut log = RaftLog::new();
364 log.append(1, Command::from_str("cmd1"));
365 log.append(1, Command::from_str("cmd2"));
366 log.append(2, Command::from_str("cmd3"));
367
368 let entries = log.get_entries_from(1, 2);
369 assert_eq!(entries.len(), 2);
370 assert_eq!(entries[0].index, 1);
371 assert_eq!(entries[1].index, 2);
372 }
373
374 #[test]
375 fn test_truncate_from() {
376 let mut log = RaftLog::new();
377 log.append(1, Command::from_str("cmd1"));
378 log.append(1, Command::from_str("cmd2"));
379 log.append(2, Command::from_str("cmd3"));
380
381 log.truncate_from(2).expect("Truncate should succeed");
382
383 assert_eq!(log.last_index(), 1);
384 assert_eq!(log.last_term(), 1);
385 assert_eq!(log.len(), 1);
386 assert!(log.get(2).is_none());
387 assert!(log.get(3).is_none());
388 }
389
390 #[test]
391 fn test_matches() {
392 let mut log = RaftLog::new();
393 log.append(1, Command::from_str("cmd1"));
394 log.append(1, Command::from_str("cmd2"));
395 log.append(2, Command::from_str("cmd3"));
396
397 assert!(log.matches(1, 1));
398 assert!(log.matches(2, 1));
399 assert!(log.matches(3, 2));
400 assert!(!log.matches(3, 1));
401 assert!(!log.matches(4, 2));
402 }
403
404 #[test]
405 fn test_commit_index() {
406 let mut log = RaftLog::new();
407 log.append(1, Command::from_str("cmd1"));
408 log.append(1, Command::from_str("cmd2"));
409 log.append(2, Command::from_str("cmd3"));
410
411 assert_eq!(log.commit_index(), 0);
412
413 log.set_commit_index(2).expect("Set commit should succeed");
414 assert_eq!(log.commit_index(), 2);
415
416 let result = log.set_commit_index(1);
418 assert!(result.is_err());
419 }
420
421 #[test]
422 fn test_applied_index() {
423 let mut log = RaftLog::new();
424 log.append(1, Command::from_str("cmd1"));
425 log.append(1, Command::from_str("cmd2"));
426 log.set_commit_index(2).expect("Set commit should succeed");
427
428 assert_eq!(log.applied_index(), 0);
429
430 log.set_applied_index(1)
431 .expect("Set applied should succeed");
432 assert_eq!(log.applied_index(), 1);
433
434 let result = log.set_applied_index(3);
436 assert!(result.is_err());
437 }
438
439 #[test]
440 fn test_get_uncommitted_entries() {
441 let mut log = RaftLog::new();
442 log.append(1, Command::from_str("cmd1"));
443 log.append(1, Command::from_str("cmd2"));
444 log.append(2, Command::from_str("cmd3"));
445 log.set_commit_index(2).expect("Set commit should succeed");
446
447 let entries = log.get_uncommitted_entries();
448 assert_eq!(entries.len(), 2);
449 assert_eq!(entries[0].index, 1);
450 assert_eq!(entries[1].index, 2);
451
452 log.set_applied_index(1)
453 .expect("Set applied should succeed");
454 let entries = log.get_uncommitted_entries();
455 assert_eq!(entries.len(), 1);
456 assert_eq!(entries[0].index, 2);
457 }
458}