Skip to main content

aegis_replication/
log.rs

1//! Aegis Replication Log
2//!
3//! Replicated log for Raft consensus.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use serde::{Deserialize, Serialize};
9use std::collections::VecDeque;
10use std::sync::RwLock;
11
12// =============================================================================
13// Log Index
14// =============================================================================
15
16/// Index in the replicated log.
17pub type LogIndex = u64;
18
19/// Term number for Raft consensus.
20pub type Term = u64;
21
22// =============================================================================
23// Log Entry
24// =============================================================================
25
26/// An entry in the replicated log.
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct LogEntry {
29    pub index: LogIndex,
30    pub term: Term,
31    pub entry_type: EntryType,
32    pub data: Vec<u8>,
33    pub timestamp: u64,
34}
35
36impl LogEntry {
37    /// Create a new log entry.
38    pub fn new(index: LogIndex, term: Term, entry_type: EntryType, data: Vec<u8>) -> Self {
39        Self {
40            index,
41            term,
42            entry_type,
43            data,
44            timestamp: current_timestamp(),
45        }
46    }
47
48    /// Create a command entry.
49    pub fn command(index: LogIndex, term: Term, data: Vec<u8>) -> Self {
50        Self::new(index, term, EntryType::Command, data)
51    }
52
53    /// Create a configuration change entry.
54    pub fn config_change(index: LogIndex, term: Term, data: Vec<u8>) -> Self {
55        Self::new(index, term, EntryType::ConfigChange, data)
56    }
57
58    /// Create a no-op entry.
59    pub fn noop(index: LogIndex, term: Term) -> Self {
60        Self::new(index, term, EntryType::NoOp, Vec::new())
61    }
62}
63
64// =============================================================================
65// Entry Type
66// =============================================================================
67
68/// Type of log entry.
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
70pub enum EntryType {
71    /// A state machine command.
72    Command,
73    /// A cluster configuration change.
74    ConfigChange,
75    /// A no-op entry for leader confirmation.
76    NoOp,
77}
78
79// =============================================================================
80// Replicated Log
81// =============================================================================
82
83/// The replicated log for Raft consensus.
84pub struct ReplicatedLog {
85    entries: RwLock<VecDeque<LogEntry>>,
86    first_index: RwLock<LogIndex>,
87    commit_index: RwLock<LogIndex>,
88    last_applied: RwLock<LogIndex>,
89    snapshot_index: RwLock<LogIndex>,
90    snapshot_term: RwLock<Term>,
91}
92
93impl ReplicatedLog {
94    /// Create a new replicated log.
95    pub fn new() -> Self {
96        Self {
97            entries: RwLock::new(VecDeque::new()),
98            first_index: RwLock::new(1),
99            commit_index: RwLock::new(0),
100            last_applied: RwLock::new(0),
101            snapshot_index: RwLock::new(0),
102            snapshot_term: RwLock::new(0),
103        }
104    }
105
106    /// Append an entry to the log.
107    pub fn append(&self, entry: LogEntry) -> LogIndex {
108        let mut entries = self.entries.write().expect("log entries lock poisoned");
109        let index = entry.index;
110        entries.push_back(entry);
111        index
112    }
113
114    /// Append multiple entries.
115    pub fn append_entries(&self, new_entries: Vec<LogEntry>) -> LogIndex {
116        let mut entries = self.entries.write().expect("log entries lock poisoned");
117        let mut last_index = 0;
118
119        for entry in new_entries {
120            last_index = entry.index;
121            entries.push_back(entry);
122        }
123
124        last_index
125    }
126
127    /// Get an entry by index.
128    pub fn get(&self, index: LogIndex) -> Option<LogEntry> {
129        let entries = self.entries.read().expect("log entries lock poisoned");
130        let first = *self
131            .first_index
132            .read()
133            .expect("log first_index lock poisoned");
134
135        if index < first {
136            return None;
137        }
138
139        let offset = (index - first) as usize;
140        entries.get(offset).cloned()
141    }
142
143    /// Get entries in a range.
144    pub fn get_range(&self, start: LogIndex, end: LogIndex) -> Vec<LogEntry> {
145        let entries = self.entries.read().expect("log entries lock poisoned");
146        let first = *self
147            .first_index
148            .read()
149            .expect("log first_index lock poisoned");
150
151        if start < first {
152            return Vec::new();
153        }
154
155        let start_offset = (start - first) as usize;
156        let end_offset = (end - first) as usize;
157
158        entries
159            .iter()
160            .skip(start_offset)
161            .take(end_offset.saturating_sub(start_offset))
162            .cloned()
163            .collect()
164    }
165
166    /// Get the last log index.
167    pub fn last_index(&self) -> LogIndex {
168        let entries = self.entries.read().expect("log entries lock poisoned");
169        let first = *self
170            .first_index
171            .read()
172            .expect("log first_index lock poisoned");
173
174        if entries.is_empty() {
175            let snapshot = *self
176                .snapshot_index
177                .read()
178                .expect("log snapshot_index lock poisoned");
179            return snapshot;
180        }
181
182        first + entries.len() as u64 - 1
183    }
184
185    /// Get the term of the last entry.
186    pub fn last_term(&self) -> Term {
187        let entries = self.entries.read().expect("log entries lock poisoned");
188
189        if let Some(entry) = entries.back() {
190            return entry.term;
191        }
192
193        *self
194            .snapshot_term
195            .read()
196            .expect("log snapshot_term lock poisoned")
197    }
198
199    /// Get the term of an entry at a specific index.
200    pub fn term_at(&self, index: LogIndex) -> Option<Term> {
201        if index == 0 {
202            return Some(0);
203        }
204
205        let snapshot_index = *self
206            .snapshot_index
207            .read()
208            .expect("log snapshot_index lock poisoned");
209        if index == snapshot_index {
210            return Some(
211                *self
212                    .snapshot_term
213                    .read()
214                    .expect("log snapshot_term lock poisoned"),
215            );
216        }
217
218        self.get(index).map(|e| e.term)
219    }
220
221    /// Get the commit index.
222    pub fn commit_index(&self) -> LogIndex {
223        *self
224            .commit_index
225            .read()
226            .expect("log commit_index lock poisoned")
227    }
228
229    /// Set the commit index.
230    pub fn set_commit_index(&self, index: LogIndex) {
231        let mut commit = self
232            .commit_index
233            .write()
234            .expect("log commit_index lock poisoned");
235        if index > *commit {
236            *commit = index;
237        }
238    }
239
240    /// Get the last applied index.
241    pub fn last_applied(&self) -> LogIndex {
242        *self
243            .last_applied
244            .read()
245            .expect("log last_applied lock poisoned")
246    }
247
248    /// Set the last applied index.
249    pub fn set_last_applied(&self, index: LogIndex) {
250        let mut applied = self
251            .last_applied
252            .write()
253            .expect("log last_applied lock poisoned");
254        *applied = index;
255    }
256
257    /// Check if there are entries to apply.
258    pub fn has_entries_to_apply(&self) -> bool {
259        let commit = self.commit_index();
260        let applied = self.last_applied();
261        commit > applied
262    }
263
264    /// Get the next entry to apply.
265    pub fn next_to_apply(&self) -> Option<LogEntry> {
266        let commit = self.commit_index();
267        let applied = self.last_applied();
268
269        if commit > applied {
270            self.get(applied + 1)
271        } else {
272            None
273        }
274    }
275
276    /// Truncate the log from a given index (inclusive).
277    pub fn truncate_from(&self, index: LogIndex) {
278        let mut entries = self.entries.write().expect("log entries lock poisoned");
279        let first = *self
280            .first_index
281            .read()
282            .expect("log first_index lock poisoned");
283
284        if index < first {
285            entries.clear();
286            return;
287        }
288
289        let offset = (index - first) as usize;
290        entries.truncate(offset);
291    }
292
293    /// Compact the log up to a given index.
294    pub fn compact(&self, up_to: LogIndex, term: Term) {
295        let mut entries = self.entries.write().expect("log entries lock poisoned");
296        let first = *self
297            .first_index
298            .read()
299            .expect("log first_index lock poisoned");
300
301        if up_to < first {
302            return;
303        }
304
305        let remove_count = (up_to - first + 1) as usize;
306        for _ in 0..remove_count.min(entries.len()) {
307            entries.pop_front();
308        }
309
310        *self
311            .first_index
312            .write()
313            .expect("log first_index lock poisoned") = up_to + 1;
314        *self
315            .snapshot_index
316            .write()
317            .expect("log snapshot_index lock poisoned") = up_to;
318        *self
319            .snapshot_term
320            .write()
321            .expect("log snapshot_term lock poisoned") = term;
322    }
323
324    /// Get the length of the log.
325    pub fn len(&self) -> usize {
326        let entries = self.entries.read().expect("log entries lock poisoned");
327        entries.len()
328    }
329
330    /// Check if the log is empty.
331    pub fn is_empty(&self) -> bool {
332        self.len() == 0
333    }
334
335    /// Check if a log is up-to-date compared to this log.
336    pub fn is_up_to_date(&self, last_log_index: LogIndex, last_log_term: Term) -> bool {
337        let our_last_term = self.last_term();
338        let our_last_index = self.last_index();
339
340        if last_log_term > our_last_term {
341            return true;
342        }
343
344        if last_log_term == our_last_term && last_log_index >= our_last_index {
345            return true;
346        }
347
348        false
349    }
350
351    /// Find the conflict point when appending entries.
352    pub fn find_conflict(&self, entries: &[LogEntry]) -> Option<LogIndex> {
353        for entry in entries {
354            if let Some(term) = self.term_at(entry.index) {
355                if term != entry.term {
356                    return Some(entry.index);
357                }
358            }
359        }
360        None
361    }
362}
363
364impl Default for ReplicatedLog {
365    fn default() -> Self {
366        Self::new()
367    }
368}
369
370fn current_timestamp() -> u64 {
371    std::time::SystemTime::now()
372        .duration_since(std::time::UNIX_EPOCH)
373        .map(|d| d.as_millis() as u64)
374        .unwrap_or(0)
375}
376
377// =============================================================================
378// Tests
379// =============================================================================
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384
385    #[test]
386    fn test_log_entry() {
387        let entry = LogEntry::command(1, 1, vec![1, 2, 3]);
388        assert_eq!(entry.index, 1);
389        assert_eq!(entry.term, 1);
390        assert_eq!(entry.entry_type, EntryType::Command);
391    }
392
393    #[test]
394    fn test_replicated_log() {
395        let log = ReplicatedLog::new();
396
397        let entry1 = LogEntry::command(1, 1, vec![1]);
398        let entry2 = LogEntry::command(2, 1, vec![2]);
399        let entry3 = LogEntry::command(3, 2, vec![3]);
400
401        log.append(entry1);
402        log.append(entry2);
403        log.append(entry3);
404
405        assert_eq!(log.len(), 3);
406        assert_eq!(log.last_index(), 3);
407        assert_eq!(log.last_term(), 2);
408    }
409
410    #[test]
411    fn test_get_entry() {
412        let log = ReplicatedLog::new();
413
414        log.append(LogEntry::command(1, 1, vec![1]));
415        log.append(LogEntry::command(2, 1, vec![2]));
416
417        let entry = log.get(1).unwrap();
418        assert_eq!(entry.index, 1);
419
420        let entry = log.get(2).unwrap();
421        assert_eq!(entry.index, 2);
422
423        assert!(log.get(3).is_none());
424    }
425
426    #[test]
427    fn test_commit_and_apply() {
428        let log = ReplicatedLog::new();
429
430        log.append(LogEntry::command(1, 1, vec![1]));
431        log.append(LogEntry::command(2, 1, vec![2]));
432        log.append(LogEntry::command(3, 1, vec![3]));
433
434        assert_eq!(log.commit_index(), 0);
435        assert_eq!(log.last_applied(), 0);
436
437        log.set_commit_index(2);
438        assert_eq!(log.commit_index(), 2);
439        assert!(log.has_entries_to_apply());
440
441        let entry = log.next_to_apply().unwrap();
442        assert_eq!(entry.index, 1);
443
444        log.set_last_applied(1);
445        let entry = log.next_to_apply().unwrap();
446        assert_eq!(entry.index, 2);
447    }
448
449    #[test]
450    fn test_truncate() {
451        let log = ReplicatedLog::new();
452
453        log.append(LogEntry::command(1, 1, vec![1]));
454        log.append(LogEntry::command(2, 1, vec![2]));
455        log.append(LogEntry::command(3, 2, vec![3]));
456
457        log.truncate_from(2);
458        assert_eq!(log.len(), 1);
459        assert_eq!(log.last_index(), 1);
460    }
461
462    #[test]
463    fn test_is_up_to_date() {
464        let log = ReplicatedLog::new();
465
466        log.append(LogEntry::command(1, 1, vec![1]));
467        log.append(LogEntry::command(2, 2, vec![2]));
468
469        assert!(log.is_up_to_date(2, 2));
470        assert!(log.is_up_to_date(3, 2));
471        assert!(log.is_up_to_date(1, 3));
472        assert!(!log.is_up_to_date(1, 1));
473    }
474}