amaters_cluster/
log.rs

1//! Log management for Raft consensus
2
3use crate::error::{RaftError, RaftResult};
4use crate::types::{LogIndex, Term};
5use std::collections::VecDeque;
6
7/// A command to be replicated in the log
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub struct Command {
10    /// The command data
11    pub data: Vec<u8>,
12}
13
14impl Command {
15    /// Create a new command
16    pub fn new(data: Vec<u8>) -> Self {
17        Self { data }
18    }
19
20    /// Create a command from a string
21    #[allow(clippy::should_implement_trait)]
22    pub fn from_str(s: &str) -> Self {
23        Self::new(s.as_bytes().to_vec())
24    }
25}
26
27/// A log entry in the Raft log
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct LogEntry {
30    /// The term when this entry was created
31    pub term: Term,
32    /// The index of this entry in the log (1-indexed)
33    pub index: LogIndex,
34    /// The command to be applied to the state machine
35    pub command: Command,
36}
37
38impl LogEntry {
39    /// Create a new log entry
40    pub fn new(term: Term, index: LogIndex, command: Command) -> Self {
41        Self {
42            term,
43            index,
44            command,
45        }
46    }
47}
48
49/// In-memory log with persistent backing
50pub struct RaftLog {
51    /// In-memory cache of log entries
52    entries: VecDeque<LogEntry>,
53    /// Index of the first entry in the cache (1-indexed)
54    /// If cache is empty, this is last_index + 1
55    first_index: LogIndex,
56    /// Index of the last entry in the log
57    last_index: LogIndex,
58    /// Term of the last entry in the log
59    last_term: Term,
60    /// Index of the highest log entry known to be committed
61    commit_index: LogIndex,
62    /// Index of the highest log entry applied to state machine
63    applied_index: LogIndex,
64    /// Snapshot metadata (index and term of last included entry)
65    snapshot_index: LogIndex,
66    snapshot_term: Term,
67}
68
69impl RaftLog {
70    /// Create a new empty log
71    pub fn new() -> Self {
72        Self {
73            entries: VecDeque::new(),
74            first_index: 1,
75            last_index: 0,
76            last_term: 0,
77            commit_index: 0,
78            applied_index: 0,
79            snapshot_index: 0,
80            snapshot_term: 0,
81        }
82    }
83
84    /// Append a new entry to the log
85    pub fn append(&mut self, term: Term, command: Command) -> LogIndex {
86        let index = self.last_index + 1;
87        let entry = LogEntry::new(term, index, command);
88
89        self.entries.push_back(entry);
90        self.last_index = index;
91        self.last_term = term;
92
93        index
94    }
95
96    /// Append multiple entries to the log
97    pub fn append_entries(&mut self, entries: Vec<LogEntry>) -> RaftResult<()> {
98        if entries.is_empty() {
99            return Ok(());
100        }
101
102        // Verify entries are sequential
103        let mut expected_index = self.last_index + 1;
104        for entry in &entries {
105            if entry.index != expected_index {
106                return Err(RaftError::LogInconsistency {
107                    reason: format!("Expected index {}, got {}", expected_index, entry.index),
108                });
109            }
110            expected_index += 1;
111        }
112
113        // Append all entries
114        for entry in entries {
115            self.last_index = entry.index;
116            self.last_term = entry.term;
117            self.entries.push_back(entry);
118        }
119
120        Ok(())
121    }
122
123    /// Get an entry by index
124    pub fn get(&self, index: LogIndex) -> Option<&LogEntry> {
125        if index < self.first_index || index > self.last_index {
126            return None;
127        }
128
129        let offset = (index - self.first_index) as usize;
130        self.entries.get(offset)
131    }
132
133    /// Get entries starting from a given index
134    pub fn get_entries_from(&self, start_index: LogIndex, max_count: usize) -> Vec<LogEntry> {
135        if start_index < self.first_index || start_index > self.last_index {
136            return Vec::new();
137        }
138
139        let offset = (start_index - self.first_index) as usize;
140        self.entries
141            .iter()
142            .skip(offset)
143            .take(max_count)
144            .cloned()
145            .collect()
146    }
147
148    /// Get the term of an entry by index
149    pub fn get_term(&self, index: LogIndex) -> Option<Term> {
150        if index == 0 {
151            return Some(0);
152        }
153
154        if index == self.snapshot_index {
155            return Some(self.snapshot_term);
156        }
157
158        self.get(index).map(|entry| entry.term)
159    }
160
161    /// Get the index of the last entry
162    pub fn last_index(&self) -> LogIndex {
163        self.last_index
164    }
165
166    /// Get the term of the last entry
167    pub fn last_term(&self) -> Term {
168        self.last_term
169    }
170
171    /// Delete entries from a given index onwards
172    pub fn truncate_from(&mut self, from_index: LogIndex) -> RaftResult<()> {
173        if from_index <= self.snapshot_index {
174            return Err(RaftError::LogInconsistency {
175                reason: format!(
176                    "Cannot truncate before snapshot index {}",
177                    self.snapshot_index
178                ),
179            });
180        }
181
182        if from_index > self.last_index {
183            return Ok(());
184        }
185
186        // Calculate how many entries to remove
187        let offset = (from_index - self.first_index) as usize;
188        self.entries.truncate(offset);
189
190        // Update last index and term
191        if let Some(last_entry) = self.entries.back() {
192            self.last_index = last_entry.index;
193            self.last_term = last_entry.term;
194        } else {
195            self.last_index = self.snapshot_index;
196            self.last_term = self.snapshot_term;
197        }
198
199        Ok(())
200    }
201
202    /// Check if the log contains an entry at the given index with the given term
203    pub fn matches(&self, index: LogIndex, term: Term) -> bool {
204        if index == 0 {
205            return term == 0;
206        }
207
208        if index == self.snapshot_index {
209            return term == self.snapshot_term;
210        }
211
212        match self.get_term(index) {
213            Some(t) => t == term,
214            None => false,
215        }
216    }
217
218    /// Get the commit index
219    pub fn commit_index(&self) -> LogIndex {
220        self.commit_index
221    }
222
223    /// Set the commit index (must be monotonically increasing)
224    pub fn set_commit_index(&mut self, index: LogIndex) -> RaftResult<()> {
225        if index < self.commit_index {
226            return Err(RaftError::LogInconsistency {
227                reason: format!(
228                    "Cannot decrease commit index from {} to {}",
229                    self.commit_index, index
230                ),
231            });
232        }
233
234        if index > self.last_index {
235            return Err(RaftError::LogInconsistency {
236                reason: format!(
237                    "Cannot commit beyond last index {} (tried to commit {})",
238                    self.last_index, index
239                ),
240            });
241        }
242
243        self.commit_index = index;
244        Ok(())
245    }
246
247    /// Get the applied index
248    pub fn applied_index(&self) -> LogIndex {
249        self.applied_index
250    }
251
252    /// Set the applied index (must be monotonically increasing)
253    pub fn set_applied_index(&mut self, index: LogIndex) -> RaftResult<()> {
254        if index < self.applied_index {
255            return Err(RaftError::LogInconsistency {
256                reason: format!(
257                    "Cannot decrease applied index from {} to {}",
258                    self.applied_index, index
259                ),
260            });
261        }
262
263        if index > self.commit_index {
264            return Err(RaftError::LogInconsistency {
265                reason: format!(
266                    "Cannot apply beyond commit index {} (tried to apply {})",
267                    self.commit_index, index
268                ),
269            });
270        }
271
272        self.applied_index = index;
273        Ok(())
274    }
275
276    /// Get entries that are committed but not yet applied
277    pub fn get_uncommitted_entries(&self) -> Vec<LogEntry> {
278        if self.applied_index >= self.commit_index {
279            return Vec::new();
280        }
281
282        self.get_entries_from(self.applied_index + 1, usize::MAX)
283            .into_iter()
284            .take_while(|entry| entry.index <= self.commit_index)
285            .collect()
286    }
287
288    /// Check if the log is empty
289    pub fn is_empty(&self) -> bool {
290        self.entries.is_empty()
291    }
292
293    /// Get the number of entries in the log
294    pub fn len(&self) -> usize {
295        self.entries.len()
296    }
297}
298
299impl Default for RaftLog {
300    fn default() -> Self {
301        Self::new()
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use super::*;
308
309    #[test]
310    fn test_new_log() {
311        let log = RaftLog::new();
312        assert_eq!(log.last_index(), 0);
313        assert_eq!(log.last_term(), 0);
314        assert_eq!(log.commit_index(), 0);
315        assert_eq!(log.applied_index(), 0);
316        assert!(log.is_empty());
317    }
318
319    #[test]
320    fn test_append_entry() {
321        let mut log = RaftLog::new();
322        let cmd = Command::from_str("test");
323
324        let index = log.append(1, cmd.clone());
325        assert_eq!(index, 1);
326        assert_eq!(log.last_index(), 1);
327        assert_eq!(log.last_term(), 1);
328        assert_eq!(log.len(), 1);
329
330        let entry = log.get(1).expect("Entry should exist");
331        assert_eq!(entry.index, 1);
332        assert_eq!(entry.term, 1);
333        assert_eq!(entry.command, cmd);
334    }
335
336    #[test]
337    fn test_append_multiple_entries() {
338        let mut log = RaftLog::new();
339        log.append(1, Command::from_str("cmd1"));
340        log.append(1, Command::from_str("cmd2"));
341        log.append(2, Command::from_str("cmd3"));
342
343        assert_eq!(log.last_index(), 3);
344        assert_eq!(log.last_term(), 2);
345        assert_eq!(log.len(), 3);
346    }
347
348    #[test]
349    fn test_get_entries_from() {
350        let mut log = RaftLog::new();
351        log.append(1, Command::from_str("cmd1"));
352        log.append(1, Command::from_str("cmd2"));
353        log.append(2, Command::from_str("cmd3"));
354
355        let entries = log.get_entries_from(2, 10);
356        assert_eq!(entries.len(), 2);
357        assert_eq!(entries[0].index, 2);
358        assert_eq!(entries[1].index, 3);
359    }
360
361    #[test]
362    fn test_get_entries_from_with_limit() {
363        let mut log = RaftLog::new();
364        log.append(1, Command::from_str("cmd1"));
365        log.append(1, Command::from_str("cmd2"));
366        log.append(2, Command::from_str("cmd3"));
367
368        let entries = log.get_entries_from(1, 2);
369        assert_eq!(entries.len(), 2);
370        assert_eq!(entries[0].index, 1);
371        assert_eq!(entries[1].index, 2);
372    }
373
374    #[test]
375    fn test_truncate_from() {
376        let mut log = RaftLog::new();
377        log.append(1, Command::from_str("cmd1"));
378        log.append(1, Command::from_str("cmd2"));
379        log.append(2, Command::from_str("cmd3"));
380
381        log.truncate_from(2).expect("Truncate should succeed");
382
383        assert_eq!(log.last_index(), 1);
384        assert_eq!(log.last_term(), 1);
385        assert_eq!(log.len(), 1);
386        assert!(log.get(2).is_none());
387        assert!(log.get(3).is_none());
388    }
389
390    #[test]
391    fn test_matches() {
392        let mut log = RaftLog::new();
393        log.append(1, Command::from_str("cmd1"));
394        log.append(1, Command::from_str("cmd2"));
395        log.append(2, Command::from_str("cmd3"));
396
397        assert!(log.matches(1, 1));
398        assert!(log.matches(2, 1));
399        assert!(log.matches(3, 2));
400        assert!(!log.matches(3, 1));
401        assert!(!log.matches(4, 2));
402    }
403
404    #[test]
405    fn test_commit_index() {
406        let mut log = RaftLog::new();
407        log.append(1, Command::from_str("cmd1"));
408        log.append(1, Command::from_str("cmd2"));
409        log.append(2, Command::from_str("cmd3"));
410
411        assert_eq!(log.commit_index(), 0);
412
413        log.set_commit_index(2).expect("Set commit should succeed");
414        assert_eq!(log.commit_index(), 2);
415
416        // Cannot decrease commit index
417        let result = log.set_commit_index(1);
418        assert!(result.is_err());
419    }
420
421    #[test]
422    fn test_applied_index() {
423        let mut log = RaftLog::new();
424        log.append(1, Command::from_str("cmd1"));
425        log.append(1, Command::from_str("cmd2"));
426        log.set_commit_index(2).expect("Set commit should succeed");
427
428        assert_eq!(log.applied_index(), 0);
429
430        log.set_applied_index(1)
431            .expect("Set applied should succeed");
432        assert_eq!(log.applied_index(), 1);
433
434        // Cannot apply beyond commit index
435        let result = log.set_applied_index(3);
436        assert!(result.is_err());
437    }
438
439    #[test]
440    fn test_get_uncommitted_entries() {
441        let mut log = RaftLog::new();
442        log.append(1, Command::from_str("cmd1"));
443        log.append(1, Command::from_str("cmd2"));
444        log.append(2, Command::from_str("cmd3"));
445        log.set_commit_index(2).expect("Set commit should succeed");
446
447        let entries = log.get_uncommitted_entries();
448        assert_eq!(entries.len(), 2);
449        assert_eq!(entries[0].index, 1);
450        assert_eq!(entries[1].index, 2);
451
452        log.set_applied_index(1)
453            .expect("Set applied should succeed");
454        let entries = log.get_uncommitted_entries();
455        assert_eq!(entries.len(), 1);
456        assert_eq!(entries[0].index, 2);
457    }
458}