1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
use super::*;
pub enum TryInsertResult {
Inserted,
/// If the entry is already inserted then we can skip the insertion.
SkippedInsertion,
/// If the entry is inconsistent with the log then we should reject the entry.
/// In this case, the leader should rewind the replication status to the follower.
InconsistencyDetected,
}
impl CommandLog {
/// Append a new entry to the log.
/// If `term` is None, then the term of the last entry is used.
/// Otherwise, the given term is used to update the term of the last entry.
pub async fn append_new_entry(&self, command: Bytes, term: Option<Term>) -> Result<Index> {
let _g = self.append_lock.lock().await;
let cur_last_log_index = self.get_log_last_index().await?;
let prev_clock = self.get_entry(cur_last_log_index).await?.this_clock;
let append_index = cur_last_log_index + 1;
let this_term = match term {
Some(t) => t,
None => prev_clock.term,
};
let this_clock = Clock {
term: this_term,
index: append_index,
};
let e = Entry {
prev_clock,
this_clock,
command,
};
self.insert_entry(e).await?;
Ok(append_index)
}
pub async fn try_insert_entry(
&self,
entry: Entry,
sender_id: NodeId,
driver: RaftDriver,
) -> Result<TryInsertResult> {
// If the entry is snapshot then we should insert this entry without consistency checks.
// Old entries before the new snapshot will be garbage collected.
match Command::deserialize(&entry.command) {
Command::Snapshot { .. } => {
let Clock {
term: _,
index: snapshot_index,
} = entry.this_clock;
warn!(
"log is too old. replicated a snapshot (idx={}) from leader",
snapshot_index
);
// Invariant: snapshot entry exists => snapshot exists
if let Err(e) = self
.app
.fetch_snapshot(snapshot_index, sender_id.clone(), driver)
.await
{
error!(
"could not fetch app snapshot (idx={}) from sender {}",
snapshot_index, sender_id,
);
return Err(e);
}
self.insert_snapshot(entry).await?;
self.commit_pointer
.store(snapshot_index - 1, Ordering::SeqCst);
self.kern_pointer
.store(snapshot_index - 1, Ordering::SeqCst);
self.user_pointer
.store(snapshot_index - 1, Ordering::SeqCst);
return Ok(TryInsertResult::Inserted);
}
_ => {}
}
let Clock {
term: _,
index: prev_index,
} = entry.prev_clock;
if let Some(prev_clock) = self
.storage
.get_entry(prev_index)
.await?
.map(|x| x.this_clock)
{
if prev_clock != entry.prev_clock {
// consistency check failed.
Ok(TryInsertResult::InconsistencyDetected)
} else {
let Clock {
term: _,
index: this_index,
} = entry.this_clock;
// optimization to skip actual insertion.
if let Some(old_clock) = self
.storage
.get_entry(this_index)
.await?
.map(|e| e.this_clock)
{
if old_clock == entry.this_clock {
// If there is a entry with the same term and index
// then the entry should be the same so to skip insertion.
return Ok(TryInsertResult::SkippedInsertion);
}
}
self.insert_entry(entry).await?;
// discard [this_index, )
self.user_completions.lock().split_off(&this_index);
self.kern_completions.lock().split_off(&this_index);
Ok(TryInsertResult::Inserted)
}
} else {
Ok(TryInsertResult::InconsistencyDetected)
}
}
}