1use serde::{Deserialize, Serialize};
9use std::collections::VecDeque;
10use std::sync::RwLock;
11
12pub type LogIndex = u64;
18
19pub type Term = u64;
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct LogEntry {
29 pub index: LogIndex,
30 pub term: Term,
31 pub entry_type: EntryType,
32 pub data: Vec<u8>,
33 pub timestamp: u64,
34}
35
36impl LogEntry {
37 pub fn new(index: LogIndex, term: Term, entry_type: EntryType, data: Vec<u8>) -> Self {
39 Self {
40 index,
41 term,
42 entry_type,
43 data,
44 timestamp: current_timestamp(),
45 }
46 }
47
48 pub fn command(index: LogIndex, term: Term, data: Vec<u8>) -> Self {
50 Self::new(index, term, EntryType::Command, data)
51 }
52
53 pub fn config_change(index: LogIndex, term: Term, data: Vec<u8>) -> Self {
55 Self::new(index, term, EntryType::ConfigChange, data)
56 }
57
58 pub fn noop(index: LogIndex, term: Term) -> Self {
60 Self::new(index, term, EntryType::NoOp, Vec::new())
61 }
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
70pub enum EntryType {
71 Command,
73 ConfigChange,
75 NoOp,
77}
78
79pub struct ReplicatedLog {
85 entries: RwLock<VecDeque<LogEntry>>,
86 first_index: RwLock<LogIndex>,
87 commit_index: RwLock<LogIndex>,
88 last_applied: RwLock<LogIndex>,
89 snapshot_index: RwLock<LogIndex>,
90 snapshot_term: RwLock<Term>,
91}
92
93impl ReplicatedLog {
94 pub fn new() -> Self {
96 Self {
97 entries: RwLock::new(VecDeque::new()),
98 first_index: RwLock::new(1),
99 commit_index: RwLock::new(0),
100 last_applied: RwLock::new(0),
101 snapshot_index: RwLock::new(0),
102 snapshot_term: RwLock::new(0),
103 }
104 }
105
106 pub fn append(&self, entry: LogEntry) -> LogIndex {
108 let mut entries = self.entries.write().unwrap();
109 let index = entry.index;
110 entries.push_back(entry);
111 index
112 }
113
114 pub fn append_entries(&self, new_entries: Vec<LogEntry>) -> LogIndex {
116 let mut entries = self.entries.write().unwrap();
117 let mut last_index = 0;
118
119 for entry in new_entries {
120 last_index = entry.index;
121 entries.push_back(entry);
122 }
123
124 last_index
125 }
126
127 pub fn get(&self, index: LogIndex) -> Option<LogEntry> {
129 let entries = self.entries.read().unwrap();
130 let first = *self.first_index.read().unwrap();
131
132 if index < first {
133 return None;
134 }
135
136 let offset = (index - first) as usize;
137 entries.get(offset).cloned()
138 }
139
140 pub fn get_range(&self, start: LogIndex, end: LogIndex) -> Vec<LogEntry> {
142 let entries = self.entries.read().unwrap();
143 let first = *self.first_index.read().unwrap();
144
145 if start < first {
146 return Vec::new();
147 }
148
149 let start_offset = (start - first) as usize;
150 let end_offset = (end - first) as usize;
151
152 entries
153 .iter()
154 .skip(start_offset)
155 .take(end_offset.saturating_sub(start_offset))
156 .cloned()
157 .collect()
158 }
159
160 pub fn last_index(&self) -> LogIndex {
162 let entries = self.entries.read().unwrap();
163 let first = *self.first_index.read().unwrap();
164
165 if entries.is_empty() {
166 let snapshot = *self.snapshot_index.read().unwrap();
167 return snapshot;
168 }
169
170 first + entries.len() as u64 - 1
171 }
172
173 pub fn last_term(&self) -> Term {
175 let entries = self.entries.read().unwrap();
176
177 if let Some(entry) = entries.back() {
178 return entry.term;
179 }
180
181 *self.snapshot_term.read().unwrap()
182 }
183
184 pub fn term_at(&self, index: LogIndex) -> Option<Term> {
186 if index == 0 {
187 return Some(0);
188 }
189
190 let snapshot_index = *self.snapshot_index.read().unwrap();
191 if index == snapshot_index {
192 return Some(*self.snapshot_term.read().unwrap());
193 }
194
195 self.get(index).map(|e| e.term)
196 }
197
198 pub fn commit_index(&self) -> LogIndex {
200 *self.commit_index.read().unwrap()
201 }
202
203 pub fn set_commit_index(&self, index: LogIndex) {
205 let mut commit = self.commit_index.write().unwrap();
206 if index > *commit {
207 *commit = index;
208 }
209 }
210
211 pub fn last_applied(&self) -> LogIndex {
213 *self.last_applied.read().unwrap()
214 }
215
216 pub fn set_last_applied(&self, index: LogIndex) {
218 let mut applied = self.last_applied.write().unwrap();
219 *applied = index;
220 }
221
222 pub fn has_entries_to_apply(&self) -> bool {
224 let commit = self.commit_index();
225 let applied = self.last_applied();
226 commit > applied
227 }
228
229 pub fn next_to_apply(&self) -> Option<LogEntry> {
231 let commit = self.commit_index();
232 let applied = self.last_applied();
233
234 if commit > applied {
235 self.get(applied + 1)
236 } else {
237 None
238 }
239 }
240
241 pub fn truncate_from(&self, index: LogIndex) {
243 let mut entries = self.entries.write().unwrap();
244 let first = *self.first_index.read().unwrap();
245
246 if index < first {
247 entries.clear();
248 return;
249 }
250
251 let offset = (index - first) as usize;
252 entries.truncate(offset);
253 }
254
255 pub fn compact(&self, up_to: LogIndex, term: Term) {
257 let mut entries = self.entries.write().unwrap();
258 let first = *self.first_index.read().unwrap();
259
260 if up_to < first {
261 return;
262 }
263
264 let remove_count = (up_to - first + 1) as usize;
265 for _ in 0..remove_count.min(entries.len()) {
266 entries.pop_front();
267 }
268
269 *self.first_index.write().unwrap() = up_to + 1;
270 *self.snapshot_index.write().unwrap() = up_to;
271 *self.snapshot_term.write().unwrap() = term;
272 }
273
274 pub fn len(&self) -> usize {
276 let entries = self.entries.read().unwrap();
277 entries.len()
278 }
279
280 pub fn is_empty(&self) -> bool {
282 self.len() == 0
283 }
284
285 pub fn is_up_to_date(&self, last_log_index: LogIndex, last_log_term: Term) -> bool {
287 let our_last_term = self.last_term();
288 let our_last_index = self.last_index();
289
290 if last_log_term > our_last_term {
291 return true;
292 }
293
294 if last_log_term == our_last_term && last_log_index >= our_last_index {
295 return true;
296 }
297
298 false
299 }
300
301 pub fn find_conflict(&self, entries: &[LogEntry]) -> Option<LogIndex> {
303 for entry in entries {
304 if let Some(term) = self.term_at(entry.index) {
305 if term != entry.term {
306 return Some(entry.index);
307 }
308 }
309 }
310 None
311 }
312}
313
314impl Default for ReplicatedLog {
315 fn default() -> Self {
316 Self::new()
317 }
318}
319
320fn current_timestamp() -> u64 {
321 std::time::SystemTime::now()
322 .duration_since(std::time::UNIX_EPOCH)
323 .map(|d| d.as_millis() as u64)
324 .unwrap_or(0)
325}
326
327#[cfg(test)]
332mod tests {
333 use super::*;
334
335 #[test]
336 fn test_log_entry() {
337 let entry = LogEntry::command(1, 1, vec![1, 2, 3]);
338 assert_eq!(entry.index, 1);
339 assert_eq!(entry.term, 1);
340 assert_eq!(entry.entry_type, EntryType::Command);
341 }
342
343 #[test]
344 fn test_replicated_log() {
345 let log = ReplicatedLog::new();
346
347 let entry1 = LogEntry::command(1, 1, vec![1]);
348 let entry2 = LogEntry::command(2, 1, vec![2]);
349 let entry3 = LogEntry::command(3, 2, vec![3]);
350
351 log.append(entry1);
352 log.append(entry2);
353 log.append(entry3);
354
355 assert_eq!(log.len(), 3);
356 assert_eq!(log.last_index(), 3);
357 assert_eq!(log.last_term(), 2);
358 }
359
360 #[test]
361 fn test_get_entry() {
362 let log = ReplicatedLog::new();
363
364 log.append(LogEntry::command(1, 1, vec![1]));
365 log.append(LogEntry::command(2, 1, vec![2]));
366
367 let entry = log.get(1).unwrap();
368 assert_eq!(entry.index, 1);
369
370 let entry = log.get(2).unwrap();
371 assert_eq!(entry.index, 2);
372
373 assert!(log.get(3).is_none());
374 }
375
376 #[test]
377 fn test_commit_and_apply() {
378 let log = ReplicatedLog::new();
379
380 log.append(LogEntry::command(1, 1, vec![1]));
381 log.append(LogEntry::command(2, 1, vec![2]));
382 log.append(LogEntry::command(3, 1, vec![3]));
383
384 assert_eq!(log.commit_index(), 0);
385 assert_eq!(log.last_applied(), 0);
386
387 log.set_commit_index(2);
388 assert_eq!(log.commit_index(), 2);
389 assert!(log.has_entries_to_apply());
390
391 let entry = log.next_to_apply().unwrap();
392 assert_eq!(entry.index, 1);
393
394 log.set_last_applied(1);
395 let entry = log.next_to_apply().unwrap();
396 assert_eq!(entry.index, 2);
397 }
398
399 #[test]
400 fn test_truncate() {
401 let log = ReplicatedLog::new();
402
403 log.append(LogEntry::command(1, 1, vec![1]));
404 log.append(LogEntry::command(2, 1, vec![2]));
405 log.append(LogEntry::command(3, 2, vec![3]));
406
407 log.truncate_from(2);
408 assert_eq!(log.len(), 1);
409 assert_eq!(log.last_index(), 1);
410 }
411
412 #[test]
413 fn test_is_up_to_date() {
414 let log = ReplicatedLog::new();
415
416 log.append(LogEntry::command(1, 1, vec![1]));
417 log.append(LogEntry::command(2, 2, vec![2]));
418
419 assert!(log.is_up_to_date(2, 2));
420 assert!(log.is_up_to_date(3, 2));
421 assert!(log.is_up_to_date(1, 3));
422 assert!(!log.is_up_to_date(1, 1));
423 }
424}