ruvector_raft/
log.rs

1//! Raft log implementation
2//!
3//! Manages the replicated log with support for:
4//! - Appending entries
5//! - Truncation and conflict resolution
6//! - Snapshots and compaction
7//! - Persistence
8
9use crate::{LogIndex, RaftError, RaftResult, Term};
10use serde::{Deserialize, Serialize};
11use std::collections::VecDeque;
12
13/// A single entry in the Raft log
14#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
15pub struct LogEntry {
16    /// Term when entry was received by leader
17    pub term: Term,
18
19    /// Index position in the log
20    pub index: LogIndex,
21
22    /// State machine command
23    pub command: Vec<u8>,
24}
25
26impl LogEntry {
27    /// Create a new log entry
28    pub fn new(term: Term, index: LogIndex, command: Vec<u8>) -> Self {
29        Self {
30            term,
31            index,
32            command,
33        }
34    }
35}
36
37/// Snapshot metadata
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct Snapshot {
40    /// Index of last entry in snapshot
41    pub last_included_index: LogIndex,
42
43    /// Term of last entry in snapshot
44    pub last_included_term: Term,
45
46    /// Snapshot data
47    pub data: Vec<u8>,
48
49    /// Configuration at the time of snapshot
50    pub configuration: Vec<String>,
51}
52
53/// The Raft replicated log
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct RaftLog {
56    /// Log entries (index starts at 1)
57    entries: VecDeque<LogEntry>,
58
59    /// Current snapshot (if any)
60    snapshot: Option<Snapshot>,
61
62    /// Base index from snapshot (0 if no snapshot)
63    base_index: LogIndex,
64
65    /// Base term from snapshot (0 if no snapshot)
66    base_term: Term,
67}
68
69impl RaftLog {
70    /// Create a new empty log
71    pub fn new() -> Self {
72        Self {
73            entries: VecDeque::new(),
74            snapshot: None,
75            base_index: 0,
76            base_term: 0,
77        }
78    }
79
80    /// Get the index of the last log entry
81    pub fn last_index(&self) -> LogIndex {
82        if let Some(entry) = self.entries.back() {
83            entry.index
84        } else {
85            self.base_index
86        }
87    }
88
89    /// Get the term of the last log entry
90    pub fn last_term(&self) -> Term {
91        if let Some(entry) = self.entries.back() {
92            entry.term
93        } else {
94            self.base_term
95        }
96    }
97
98    /// Get the term at a specific index
99    pub fn term_at(&self, index: LogIndex) -> Option<Term> {
100        if index == self.base_index {
101            return Some(self.base_term);
102        }
103
104        if index < self.base_index {
105            return None;
106        }
107
108        let offset = (index - self.base_index - 1) as usize;
109        self.entries.get(offset).map(|entry| entry.term)
110    }
111
112    /// Get a log entry at a specific index
113    pub fn get(&self, index: LogIndex) -> Option<&LogEntry> {
114        if index <= self.base_index {
115            return None;
116        }
117
118        let offset = (index - self.base_index - 1) as usize;
119        self.entries.get(offset)
120    }
121
122    /// Get entries starting from an index
123    pub fn entries_from(&self, start_index: LogIndex) -> Vec<LogEntry> {
124        if start_index <= self.base_index {
125            return self.entries.iter().cloned().collect();
126        }
127
128        let offset = (start_index - self.base_index - 1) as usize;
129        self.entries.iter().skip(offset).cloned().collect()
130    }
131
132    /// Append a new entry to the log
133    pub fn append(&mut self, term: Term, command: Vec<u8>) -> LogIndex {
134        let index = self.last_index() + 1;
135        let entry = LogEntry::new(term, index, command);
136        self.entries.push_back(entry);
137        index
138    }
139
140    /// Append multiple entries (for replication)
141    pub fn append_entries(&mut self, entries: Vec<LogEntry>) -> RaftResult<()> {
142        for entry in entries {
143            // Verify index is sequential
144            let expected_index = self.last_index() + 1;
145            if entry.index != expected_index {
146                return Err(RaftError::LogInconsistency);
147            }
148            self.entries.push_back(entry);
149        }
150        Ok(())
151    }
152
153    /// Truncate log from a given index (delete entries >= index)
154    pub fn truncate_from(&mut self, index: LogIndex) -> RaftResult<()> {
155        if index <= self.base_index {
156            return Err(RaftError::InvalidLogIndex(index));
157        }
158
159        let offset = (index - self.base_index - 1) as usize;
160        self.entries.truncate(offset);
161        Ok(())
162    }
163
164    /// Check if log contains an entry at index with the given term
165    pub fn matches(&self, index: LogIndex, term: Term) -> bool {
166        if index == 0 {
167            return true;
168        }
169
170        if index == self.base_index {
171            return term == self.base_term;
172        }
173
174        match self.term_at(index) {
175            Some(entry_term) => entry_term == term,
176            None => false,
177        }
178    }
179
180    /// Install a snapshot and compact the log
181    pub fn install_snapshot(&mut self, snapshot: Snapshot) -> RaftResult<()> {
182        let last_index = snapshot.last_included_index;
183        let last_term = snapshot.last_included_term;
184
185        // Remove all entries up to and including the snapshot's last index
186        while let Some(entry) = self.entries.front() {
187            if entry.index <= last_index {
188                self.entries.pop_front();
189            } else {
190                break;
191            }
192        }
193
194        self.base_index = last_index;
195        self.base_term = last_term;
196        self.snapshot = Some(snapshot);
197
198        Ok(())
199    }
200
201    /// Create a snapshot up to the given index
202    pub fn create_snapshot(
203        &mut self,
204        up_to_index: LogIndex,
205        data: Vec<u8>,
206        configuration: Vec<String>,
207    ) -> RaftResult<Snapshot> {
208        if up_to_index <= self.base_index {
209            return Err(RaftError::InvalidLogIndex(up_to_index));
210        }
211
212        let term = self
213            .term_at(up_to_index)
214            .ok_or(RaftError::InvalidLogIndex(up_to_index))?;
215
216        let snapshot = Snapshot {
217            last_included_index: up_to_index,
218            last_included_term: term,
219            data,
220            configuration,
221        };
222
223        // Compact the log by removing entries before the snapshot
224        self.install_snapshot(snapshot.clone())?;
225
226        Ok(snapshot)
227    }
228
229    /// Get the current snapshot
230    pub fn snapshot(&self) -> Option<&Snapshot> {
231        self.snapshot.as_ref()
232    }
233
234    /// Get the number of entries in memory
235    pub fn len(&self) -> usize {
236        self.entries.len()
237    }
238
239    /// Check if the log is empty
240    pub fn is_empty(&self) -> bool {
241        self.entries.is_empty() && self.base_index == 0
242    }
243
244    /// Get the base index from snapshot
245    pub fn base_index(&self) -> LogIndex {
246        self.base_index
247    }
248
249    /// Get the base term from snapshot
250    pub fn base_term(&self) -> Term {
251        self.base_term
252    }
253}
254
255impl Default for RaftLog {
256    fn default() -> Self {
257        Self::new()
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264
265    #[test]
266    fn test_log_append() {
267        let mut log = RaftLog::new();
268        assert_eq!(log.last_index(), 0);
269
270        let idx1 = log.append(1, b"cmd1".to_vec());
271        assert_eq!(idx1, 1);
272        assert_eq!(log.last_index(), 1);
273        assert_eq!(log.last_term(), 1);
274
275        let idx2 = log.append(1, b"cmd2".to_vec());
276        assert_eq!(idx2, 2);
277        assert_eq!(log.last_index(), 2);
278    }
279
280    #[test]
281    fn test_log_get() {
282        let mut log = RaftLog::new();
283        log.append(1, b"cmd1".to_vec());
284        log.append(1, b"cmd2".to_vec());
285        log.append(2, b"cmd3".to_vec());
286
287        let entry = log.get(2).unwrap();
288        assert_eq!(entry.term, 1);
289        assert_eq!(entry.command, b"cmd2");
290
291        assert!(log.get(0).is_none());
292        assert!(log.get(10).is_none());
293    }
294
295    #[test]
296    fn test_log_truncate() {
297        let mut log = RaftLog::new();
298        log.append(1, b"cmd1".to_vec());
299        log.append(1, b"cmd2".to_vec());
300        log.append(2, b"cmd3".to_vec());
301
302        log.truncate_from(2).unwrap();
303        assert_eq!(log.last_index(), 1);
304        assert!(log.get(2).is_none());
305    }
306
307    #[test]
308    fn test_log_matches() {
309        let mut log = RaftLog::new();
310        log.append(1, b"cmd1".to_vec());
311        log.append(1, b"cmd2".to_vec());
312        log.append(2, b"cmd3".to_vec());
313
314        assert!(log.matches(1, 1));
315        assert!(log.matches(2, 1));
316        assert!(log.matches(3, 2));
317        assert!(!log.matches(3, 1));
318        assert!(!log.matches(10, 1));
319    }
320
321    #[test]
322    fn test_snapshot_creation() {
323        let mut log = RaftLog::new();
324        log.append(1, b"cmd1".to_vec());
325        log.append(1, b"cmd2".to_vec());
326        log.append(2, b"cmd3".to_vec());
327
328        let snapshot = log
329            .create_snapshot(2, b"state".to_vec(), vec!["node1".to_string()])
330            .unwrap();
331
332        assert_eq!(snapshot.last_included_index, 2);
333        assert_eq!(snapshot.last_included_term, 1);
334        assert_eq!(log.base_index(), 2);
335        assert_eq!(log.len(), 1); // Only entry 3 remains
336    }
337
338    #[test]
339    fn test_entries_from() {
340        let mut log = RaftLog::new();
341        log.append(1, b"cmd1".to_vec());
342        log.append(1, b"cmd2".to_vec());
343        log.append(2, b"cmd3".to_vec());
344
345        let entries = log.entries_from(2);
346        assert_eq!(entries.len(), 2);
347        assert_eq!(entries[0].index, 2);
348        assert_eq!(entries[1].index, 3);
349    }
350}