1use serde::{Deserialize, Serialize};
9use std::collections::VecDeque;
10use std::sync::RwLock;
11
12pub type LogIndex = u64;
18
19pub type Term = u64;
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct LogEntry {
29 pub index: LogIndex,
30 pub term: Term,
31 pub entry_type: EntryType,
32 pub data: Vec<u8>,
33 pub timestamp: u64,
34}
35
36impl LogEntry {
37 pub fn new(index: LogIndex, term: Term, entry_type: EntryType, data: Vec<u8>) -> Self {
39 Self {
40 index,
41 term,
42 entry_type,
43 data,
44 timestamp: current_timestamp(),
45 }
46 }
47
48 pub fn command(index: LogIndex, term: Term, data: Vec<u8>) -> Self {
50 Self::new(index, term, EntryType::Command, data)
51 }
52
53 pub fn config_change(index: LogIndex, term: Term, data: Vec<u8>) -> Self {
55 Self::new(index, term, EntryType::ConfigChange, data)
56 }
57
58 pub fn noop(index: LogIndex, term: Term) -> Self {
60 Self::new(index, term, EntryType::NoOp, Vec::new())
61 }
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
70pub enum EntryType {
71 Command,
73 ConfigChange,
75 NoOp,
77}
78
79pub struct ReplicatedLog {
85 entries: RwLock<VecDeque<LogEntry>>,
86 first_index: RwLock<LogIndex>,
87 commit_index: RwLock<LogIndex>,
88 last_applied: RwLock<LogIndex>,
89 snapshot_index: RwLock<LogIndex>,
90 snapshot_term: RwLock<Term>,
91}
92
93impl ReplicatedLog {
94 pub fn new() -> Self {
96 Self {
97 entries: RwLock::new(VecDeque::new()),
98 first_index: RwLock::new(1),
99 commit_index: RwLock::new(0),
100 last_applied: RwLock::new(0),
101 snapshot_index: RwLock::new(0),
102 snapshot_term: RwLock::new(0),
103 }
104 }
105
106 pub fn append(&self, entry: LogEntry) -> LogIndex {
108 let mut entries = self.entries.write().expect("log entries lock poisoned");
109 let index = entry.index;
110 entries.push_back(entry);
111 index
112 }
113
114 pub fn append_entries(&self, new_entries: Vec<LogEntry>) -> LogIndex {
116 let mut entries = self.entries.write().expect("log entries lock poisoned");
117 let mut last_index = 0;
118
119 for entry in new_entries {
120 last_index = entry.index;
121 entries.push_back(entry);
122 }
123
124 last_index
125 }
126
127 pub fn get(&self, index: LogIndex) -> Option<LogEntry> {
129 let entries = self.entries.read().expect("log entries lock poisoned");
130 let first = *self
131 .first_index
132 .read()
133 .expect("log first_index lock poisoned");
134
135 if index < first {
136 return None;
137 }
138
139 let offset = (index - first) as usize;
140 entries.get(offset).cloned()
141 }
142
143 pub fn get_range(&self, start: LogIndex, end: LogIndex) -> Vec<LogEntry> {
145 let entries = self.entries.read().expect("log entries lock poisoned");
146 let first = *self
147 .first_index
148 .read()
149 .expect("log first_index lock poisoned");
150
151 if start < first {
152 return Vec::new();
153 }
154
155 let start_offset = (start - first) as usize;
156 let end_offset = (end - first) as usize;
157
158 entries
159 .iter()
160 .skip(start_offset)
161 .take(end_offset.saturating_sub(start_offset))
162 .cloned()
163 .collect()
164 }
165
166 pub fn last_index(&self) -> LogIndex {
168 let entries = self.entries.read().expect("log entries lock poisoned");
169 let first = *self
170 .first_index
171 .read()
172 .expect("log first_index lock poisoned");
173
174 if entries.is_empty() {
175 let snapshot = *self
176 .snapshot_index
177 .read()
178 .expect("log snapshot_index lock poisoned");
179 return snapshot;
180 }
181
182 first + entries.len() as u64 - 1
183 }
184
185 pub fn last_term(&self) -> Term {
187 let entries = self.entries.read().expect("log entries lock poisoned");
188
189 if let Some(entry) = entries.back() {
190 return entry.term;
191 }
192
193 *self
194 .snapshot_term
195 .read()
196 .expect("log snapshot_term lock poisoned")
197 }
198
199 pub fn term_at(&self, index: LogIndex) -> Option<Term> {
201 if index == 0 {
202 return Some(0);
203 }
204
205 let snapshot_index = *self
206 .snapshot_index
207 .read()
208 .expect("log snapshot_index lock poisoned");
209 if index == snapshot_index {
210 return Some(
211 *self
212 .snapshot_term
213 .read()
214 .expect("log snapshot_term lock poisoned"),
215 );
216 }
217
218 self.get(index).map(|e| e.term)
219 }
220
221 pub fn commit_index(&self) -> LogIndex {
223 *self
224 .commit_index
225 .read()
226 .expect("log commit_index lock poisoned")
227 }
228
229 pub fn set_commit_index(&self, index: LogIndex) {
231 let mut commit = self
232 .commit_index
233 .write()
234 .expect("log commit_index lock poisoned");
235 if index > *commit {
236 *commit = index;
237 }
238 }
239
240 pub fn last_applied(&self) -> LogIndex {
242 *self
243 .last_applied
244 .read()
245 .expect("log last_applied lock poisoned")
246 }
247
248 pub fn set_last_applied(&self, index: LogIndex) {
250 let mut applied = self
251 .last_applied
252 .write()
253 .expect("log last_applied lock poisoned");
254 *applied = index;
255 }
256
257 pub fn has_entries_to_apply(&self) -> bool {
259 let commit = self.commit_index();
260 let applied = self.last_applied();
261 commit > applied
262 }
263
264 pub fn next_to_apply(&self) -> Option<LogEntry> {
266 let commit = self.commit_index();
267 let applied = self.last_applied();
268
269 if commit > applied {
270 self.get(applied + 1)
271 } else {
272 None
273 }
274 }
275
276 pub fn truncate_from(&self, index: LogIndex) {
278 let mut entries = self.entries.write().expect("log entries lock poisoned");
279 let first = *self
280 .first_index
281 .read()
282 .expect("log first_index lock poisoned");
283
284 if index < first {
285 entries.clear();
286 return;
287 }
288
289 let offset = (index - first) as usize;
290 entries.truncate(offset);
291 }
292
293 pub fn compact(&self, up_to: LogIndex, term: Term) {
295 let mut entries = self.entries.write().expect("log entries lock poisoned");
296 let first = *self
297 .first_index
298 .read()
299 .expect("log first_index lock poisoned");
300
301 if up_to < first {
302 return;
303 }
304
305 let remove_count = (up_to - first + 1) as usize;
306 for _ in 0..remove_count.min(entries.len()) {
307 entries.pop_front();
308 }
309
310 *self
311 .first_index
312 .write()
313 .expect("log first_index lock poisoned") = up_to + 1;
314 *self
315 .snapshot_index
316 .write()
317 .expect("log snapshot_index lock poisoned") = up_to;
318 *self
319 .snapshot_term
320 .write()
321 .expect("log snapshot_term lock poisoned") = term;
322 }
323
324 pub fn len(&self) -> usize {
326 let entries = self.entries.read().expect("log entries lock poisoned");
327 entries.len()
328 }
329
330 pub fn is_empty(&self) -> bool {
332 self.len() == 0
333 }
334
335 pub fn is_up_to_date(&self, last_log_index: LogIndex, last_log_term: Term) -> bool {
337 let our_last_term = self.last_term();
338 let our_last_index = self.last_index();
339
340 if last_log_term > our_last_term {
341 return true;
342 }
343
344 if last_log_term == our_last_term && last_log_index >= our_last_index {
345 return true;
346 }
347
348 false
349 }
350
351 pub fn find_conflict(&self, entries: &[LogEntry]) -> Option<LogIndex> {
353 for entry in entries {
354 if let Some(term) = self.term_at(entry.index) {
355 if term != entry.term {
356 return Some(entry.index);
357 }
358 }
359 }
360 None
361 }
362}
363
364impl Default for ReplicatedLog {
365 fn default() -> Self {
366 Self::new()
367 }
368}
369
370fn current_timestamp() -> u64 {
371 std::time::SystemTime::now()
372 .duration_since(std::time::UNIX_EPOCH)
373 .map(|d| d.as_millis() as u64)
374 .unwrap_or(0)
375}
376
377#[cfg(test)]
382mod tests {
383 use super::*;
384
385 #[test]
386 fn test_log_entry() {
387 let entry = LogEntry::command(1, 1, vec![1, 2, 3]);
388 assert_eq!(entry.index, 1);
389 assert_eq!(entry.term, 1);
390 assert_eq!(entry.entry_type, EntryType::Command);
391 }
392
393 #[test]
394 fn test_replicated_log() {
395 let log = ReplicatedLog::new();
396
397 let entry1 = LogEntry::command(1, 1, vec![1]);
398 let entry2 = LogEntry::command(2, 1, vec![2]);
399 let entry3 = LogEntry::command(3, 2, vec![3]);
400
401 log.append(entry1);
402 log.append(entry2);
403 log.append(entry3);
404
405 assert_eq!(log.len(), 3);
406 assert_eq!(log.last_index(), 3);
407 assert_eq!(log.last_term(), 2);
408 }
409
410 #[test]
411 fn test_get_entry() {
412 let log = ReplicatedLog::new();
413
414 log.append(LogEntry::command(1, 1, vec![1]));
415 log.append(LogEntry::command(2, 1, vec![2]));
416
417 let entry = log.get(1).unwrap();
418 assert_eq!(entry.index, 1);
419
420 let entry = log.get(2).unwrap();
421 assert_eq!(entry.index, 2);
422
423 assert!(log.get(3).is_none());
424 }
425
426 #[test]
427 fn test_commit_and_apply() {
428 let log = ReplicatedLog::new();
429
430 log.append(LogEntry::command(1, 1, vec![1]));
431 log.append(LogEntry::command(2, 1, vec![2]));
432 log.append(LogEntry::command(3, 1, vec![3]));
433
434 assert_eq!(log.commit_index(), 0);
435 assert_eq!(log.last_applied(), 0);
436
437 log.set_commit_index(2);
438 assert_eq!(log.commit_index(), 2);
439 assert!(log.has_entries_to_apply());
440
441 let entry = log.next_to_apply().unwrap();
442 assert_eq!(entry.index, 1);
443
444 log.set_last_applied(1);
445 let entry = log.next_to_apply().unwrap();
446 assert_eq!(entry.index, 2);
447 }
448
449 #[test]
450 fn test_truncate() {
451 let log = ReplicatedLog::new();
452
453 log.append(LogEntry::command(1, 1, vec![1]));
454 log.append(LogEntry::command(2, 1, vec![2]));
455 log.append(LogEntry::command(3, 2, vec![3]));
456
457 log.truncate_from(2);
458 assert_eq!(log.len(), 1);
459 assert_eq!(log.last_index(), 1);
460 }
461
462 #[test]
463 fn test_is_up_to_date() {
464 let log = ReplicatedLog::new();
465
466 log.append(LogEntry::command(1, 1, vec![1]));
467 log.append(LogEntry::command(2, 2, vec![2]));
468
469 assert!(log.is_up_to_date(2, 2));
470 assert!(log.is_up_to_date(3, 2));
471 assert!(log.is_up_to_date(1, 3));
472 assert!(!log.is_up_to_date(1, 1));
473 }
474}