1use std::collections::VecDeque;
2use trackable::error::ErrorKindExt;
3
4use cluster::ClusterConfig;
5use log::{LogEntry, LogIndex, LogPosition, LogPrefix, LogSuffix};
6use {ErrorKind, Result};
7
8#[derive(Debug, Clone)]
18pub struct LogHistory {
19 appended_tail: LogPosition,
20 committed_tail: LogPosition,
21 consumed_tail: LogPosition,
22 records: VecDeque<HistoryRecord>,
23}
24impl LogHistory {
25 pub fn new(config: ClusterConfig) -> Self {
27 let initial = HistoryRecord::new(LogPosition::default(), config);
28 LogHistory {
29 appended_tail: LogPosition::default(),
30 committed_tail: LogPosition::default(),
31 consumed_tail: LogPosition::default(),
32 records: vec![initial].into(),
33 }
34 }
35
36 pub fn head(&self) -> LogPosition {
38 self.records[0].head
39 }
40
41 pub fn tail(&self) -> LogPosition {
43 self.appended_tail
44 }
45
46 pub fn committed_tail(&self) -> LogPosition {
50 self.committed_tail
51 }
52
53 pub fn consumed_tail(&self) -> LogPosition {
55 self.consumed_tail
56 }
57
58 pub fn config(&self) -> &ClusterConfig {
60 &self.last_record().config
61 }
62
63 pub fn last_record(&self) -> &HistoryRecord {
65 self.records.back().expect("Never fails")
66 }
67
68 pub fn get_record(&self, index: LogIndex) -> Option<&HistoryRecord> {
72 for r in self.records.iter().rev() {
73 if r.head.index <= index {
74 return Some(&r);
75 }
76 }
77 None
78 }
79
80 pub fn record_appended(&mut self, suffix: &LogSuffix) -> Result<()> {
82 let entries_offset = if self.appended_tail.index <= suffix.head.index {
83 0
84 } else {
85 self.appended_tail.index - suffix.head.index
89 };
90 for (i, e) in suffix.entries.iter().enumerate().skip(entries_offset) {
91 let tail = LogPosition {
92 prev_term: e.term(),
93 index: suffix.head.index + i + 1,
94 };
95 if let LogEntry::Config { ref config, .. } = *e {
96 if self.last_record().config != *config {
97 let record = HistoryRecord::new(tail, config.clone());
99 self.records.push_back(record);
100 }
101 }
102 if tail.prev_term != self.last_record().head.prev_term {
103 track_assert!(
105 self.last_record().head.prev_term < tail.prev_term,
106 ErrorKind::Other,
107 "last_record.head={:?}, tail={:?}",
108 self.last_record().head,
109 tail
110 );
111 let record = HistoryRecord::new(tail, self.last_record().config.clone());
112 self.records.push_back(record);
113 }
114 }
115 self.appended_tail = suffix.tail();
116 Ok(())
117 }
118
119 pub fn record_committed(&mut self, new_tail_index: LogIndex) -> Result<()> {
121 track_assert!(
122 self.committed_tail.index <= new_tail_index,
123 ErrorKind::Other
124 );
125 track_assert!(
126 new_tail_index <= self.appended_tail.index,
127 ErrorKind::Other,
128 "new_tail_index={:?}, self.appended_tail.index={:?}",
129 new_tail_index,
130 self.appended_tail.index
131 );
132 let prev_term = track!(self
133 .get_record(new_tail_index,)
134 .ok_or_else(|| ErrorKind::Other.error(),))?
135 .head
136 .prev_term;
137 self.committed_tail = LogPosition {
138 prev_term,
139 index: new_tail_index,
140 };
141 Ok(())
142 }
143
144 pub fn record_consumed(&mut self, new_tail_index: LogIndex) -> Result<()> {
148 track_assert!(self.consumed_tail.index <= new_tail_index, ErrorKind::Other);
149 track_assert!(
150 new_tail_index <= self.committed_tail.index,
151 ErrorKind::Other
152 );
153
154 let prev_term =
155 track!(self.get_record(new_tail_index).ok_or_else(
156 || ErrorKind::Other.cause(format!("Too old index: {:?}", new_tail_index))
157 ))?
158 .head
159 .prev_term;
160 self.consumed_tail = LogPosition {
161 prev_term,
162 index: new_tail_index,
163 };
164 Ok(())
165 }
166
167 pub fn record_rollback(&mut self, new_tail: LogPosition) -> Result<()> {
171 track_assert!(new_tail.index <= self.appended_tail.index, ErrorKind::Other);
172 track_assert!(
173 self.committed_tail.index <= new_tail.index,
174 ErrorKind::Other,
175 "old={:?}, new={:?}",
176 self.committed_tail,
177 new_tail
178 );
179 track_assert_eq!(
180 self.get_record(new_tail.index).map(|r| r.head.prev_term),
181 Some(new_tail.prev_term),
182 ErrorKind::InconsistentState
183 );
184 self.appended_tail = new_tail;
185
186 if let Some(new_len) = self
187 .records
188 .iter()
189 .position(|r| r.head.index > new_tail.index)
190 {
191 self.records.truncate(new_len);
192 }
193 Ok(())
194 }
195
196 pub fn record_snapshot_installed(
207 &mut self,
208 new_head: LogPosition,
209 config: ClusterConfig,
210 ) -> Result<()> {
211 track_assert!(
212 self.head().index <= new_head.index,
213 ErrorKind::InconsistentState,
214 "self.head={:?}, new_head={:?}",
215 self.head(),
216 new_head
217 );
218
219 while self
221 .records
222 .front()
223 .map_or(false, |r| r.head.index <= new_head.index)
224 {
225 self.records.pop_front();
226 }
227
228 let record = HistoryRecord::new(new_head, config);
230 self.records.push_front(record);
231
232 if self.appended_tail.index < new_head.index {
233 self.appended_tail = new_head;
234 }
235 if self.committed_tail.index < new_head.index {
236 self.committed_tail = new_head;
237 }
238 Ok(())
239 }
240
241 pub fn record_snapshot_loaded(&mut self, snapshot: &LogPrefix) -> Result<()> {
245 if self.consumed_tail.index < snapshot.tail.index {
246 track_assert!(
247 snapshot.tail.index <= self.committed_tail.index,
248 ErrorKind::InconsistentState,
249 "snapshot.tail.index={:?}, self.committed_tail.index={:?}",
250 snapshot.tail.index,
251 self.committed_tail.index
252 );
253 self.consumed_tail = snapshot.tail;
254 }
255 Ok(())
256 }
257}
258
259#[derive(Debug, Clone)]
261pub struct HistoryRecord {
262 pub head: LogPosition,
264
265 pub config: ClusterConfig,
267}
268impl HistoryRecord {
269 fn new(head: LogPosition, config: ClusterConfig) -> Self {
270 HistoryRecord { head, config }
271 }
272}