Skip to main content

raftlog/log/
history.rs

1use std::collections::VecDeque;
2use trackable::error::ErrorKindExt;
3
4use cluster::ClusterConfig;
5use log::{LogEntry, LogIndex, LogPosition, LogPrefix, LogSuffix};
6use {ErrorKind, Result};
7
8/// ローカルログの歴史(要約)を保持するためのデータ構造.
9///
10/// スナップショット地点以降のローカルログに関して発生した、
11/// 重要な出来事(i.g., `Term`の変更)が記録されている.
12///
13/// それ以外に「ログの末尾(log_tail)」および「ログのコミット済み末尾(log_committed_tail)」、
14/// 「ログの消費済み末尾(log_consumed_tail)」の三つの地点を保持している.
15///
16/// それらの関しては`log_consumed_tail <= log_committed_tail <= log_tail`の不変項が維持される.
17#[derive(Debug, Clone)]
18pub struct LogHistory {
19    appended_tail: LogPosition,
20    committed_tail: LogPosition,
21    consumed_tail: LogPosition,
22    records: VecDeque<HistoryRecord>,
23}
24impl LogHistory {
25    /// 初期クラスタ構成を与えて、新しい`LogHistory`インスタンスを生成する.
26    pub fn new(config: ClusterConfig) -> Self {
27        let initial = HistoryRecord::new(LogPosition::default(), config);
28        LogHistory {
29            appended_tail: LogPosition::default(),
30            committed_tail: LogPosition::default(),
31            consumed_tail: LogPosition::default(),
32            records: vec![initial].into(),
33        }
34    }
35
36    /// ローカルログの先端位置を返す.
37    pub fn head(&self) -> LogPosition {
38        self.records[0].head
39    }
40
41    /// ローカルログの終端位置を返す.
42    pub fn tail(&self) -> LogPosition {
43        self.appended_tail
44    }
45
46    /// ローカルログのコミット済みの終端位置を返す.
47    ///
48    /// 「コミット済みの終端」==「未コミットの始端」
49    pub fn committed_tail(&self) -> LogPosition {
50        self.committed_tail
51    }
52
53    /// ローカルログの適用済みの終端位置を返す.
54    pub fn consumed_tail(&self) -> LogPosition {
55        self.consumed_tail
56    }
57
58    /// ローカルログに記録された最新のクラスタ構成を返す.
59    pub fn config(&self) -> &ClusterConfig {
60        &self.last_record().config
61    }
62
63    /// 最後に追加された`HistoryRecord`を返す.
64    pub fn last_record(&self) -> &HistoryRecord {
65        self.records.back().expect("Never fails")
66    }
67
68    /// 指定されたインデックスが属するレコードを返す.
69    ///
70    /// 既に削除された領域が指定された場合には`None`が返される.
71    pub fn get_record(&self, index: LogIndex) -> Option<&HistoryRecord> {
72        for r in self.records.iter().rev() {
73            if r.head.index <= index {
74                return Some(&r);
75            }
76        }
77        None
78    }
79
80    /// `suffix`がローカルログに追記されたことを記録する.
81    pub fn record_appended(&mut self, suffix: &LogSuffix) -> Result<()> {
82        let entries_offset = if self.appended_tail.index <= suffix.head.index {
83            0
84        } else {
85            // NOTE:
86            // 追記中にスナップショットがインストールされた場合に、
87            // 両者の先頭位置がズレることがあるので調整する
88            self.appended_tail.index - suffix.head.index
89        };
90        for (i, e) in suffix.entries.iter().enumerate().skip(entries_offset) {
91            let tail = LogPosition {
92                prev_term: e.term(),
93                index: suffix.head.index + i + 1,
94            };
95            if let LogEntry::Config { ref config, .. } = *e {
96                if self.last_record().config != *config {
97                    // クラスタ構成が変更された
98                    let record = HistoryRecord::new(tail, config.clone());
99                    self.records.push_back(record);
100                }
101            }
102            if tail.prev_term != self.last_record().head.prev_term {
103                // 新しい選挙期間(`Term`)に移った
104                track_assert!(
105                    self.last_record().head.prev_term < tail.prev_term,
106                    ErrorKind::Other,
107                    "last_record.head={:?}, tail={:?}",
108                    self.last_record().head,
109                    tail
110                );
111                let record = HistoryRecord::new(tail, self.last_record().config.clone());
112                self.records.push_back(record);
113            }
114        }
115        self.appended_tail = suffix.tail();
116        Ok(())
117    }
118
119    /// `new_tail_index`までコミット済み地点が進んだことを記録する.
120    pub fn record_committed(&mut self, new_tail_index: LogIndex) -> Result<()> {
121        track_assert!(
122            self.committed_tail.index <= new_tail_index,
123            ErrorKind::Other
124        );
125        track_assert!(
126            new_tail_index <= self.appended_tail.index,
127            ErrorKind::Other,
128            "new_tail_index={:?}, self.appended_tail.index={:?}",
129            new_tail_index,
130            self.appended_tail.index
131        );
132        let prev_term = track!(self
133            .get_record(new_tail_index,)
134            .ok_or_else(|| ErrorKind::Other.error(),))?
135        .head
136        .prev_term;
137        self.committed_tail = LogPosition {
138            prev_term,
139            index: new_tail_index,
140        };
141        Ok(())
142    }
143
144    /// `new_tail`までのログに含まれるコマンドが消費されたことを記録する.
145    ///
146    /// ここでの"消費"とは「状態機械に入力として渡されて実行された」ことを意味する.
147    pub fn record_consumed(&mut self, new_tail_index: LogIndex) -> Result<()> {
148        track_assert!(self.consumed_tail.index <= new_tail_index, ErrorKind::Other);
149        track_assert!(
150            new_tail_index <= self.committed_tail.index,
151            ErrorKind::Other
152        );
153
154        let prev_term =
155            track!(self.get_record(new_tail_index).ok_or_else(
156                || ErrorKind::Other.cause(format!("Too old index: {:?}", new_tail_index))
157            ))?
158            .head
159            .prev_term;
160        self.consumed_tail = LogPosition {
161            prev_term,
162            index: new_tail_index,
163        };
164        Ok(())
165    }
166
167    /// 「追記済み and 未コミット」な末尾領域がロールバック(破棄)されたことを記録する.
168    ///
169    /// ログの新しい終端は`new_tail`となる.
170    pub fn record_rollback(&mut self, new_tail: LogPosition) -> Result<()> {
171        track_assert!(new_tail.index <= self.appended_tail.index, ErrorKind::Other);
172        track_assert!(
173            self.committed_tail.index <= new_tail.index,
174            ErrorKind::Other,
175            "old={:?}, new={:?}",
176            self.committed_tail,
177            new_tail
178        );
179        track_assert_eq!(
180            self.get_record(new_tail.index).map(|r| r.head.prev_term),
181            Some(new_tail.prev_term),
182            ErrorKind::InconsistentState
183        );
184        self.appended_tail = new_tail;
185
186        if let Some(new_len) = self
187            .records
188            .iter()
189            .position(|r| r.head.index > new_tail.index)
190        {
191            self.records.truncate(new_len);
192        }
193        Ok(())
194    }
195
196    /// スナップショットがインストールされたことを記録する.
197    ///
198    /// `new_head`はスナップショットに含まれない最初のエントリのIDで、
199    /// `config`はスナップショット取得時のクラスタ構成、を示す.
200    ///
201    /// `new_head`は、現在のログの末尾を超えていても良いが、
202    /// 現在のログの先頭以前のものは許容されない.
203    /// (スナップショット地点から現在までの歴史が消失してしまうため)
204    ///
205    /// なお、`head`以前の記録は歴史から削除される.
206    pub fn record_snapshot_installed(
207        &mut self,
208        new_head: LogPosition,
209        config: ClusterConfig,
210    ) -> Result<()> {
211        track_assert!(
212            self.head().index <= new_head.index,
213            ErrorKind::InconsistentState,
214            "self.head={:?}, new_head={:?}",
215            self.head(),
216            new_head
217        );
218
219        // スナップショット地点までの歴史は捨てる
220        while self
221            .records
222            .front()
223            .map_or(false, |r| r.head.index <= new_head.index)
224        {
225            self.records.pop_front();
226        }
227
228        // 新しいログの先頭をセット
229        let record = HistoryRecord::new(new_head, config);
230        self.records.push_front(record);
231
232        if self.appended_tail.index < new_head.index {
233            self.appended_tail = new_head;
234        }
235        if self.committed_tail.index < new_head.index {
236            self.committed_tail = new_head;
237        }
238        Ok(())
239    }
240
241    /// スナップショットが読み込まれたことを記録する.
242    ///
243    /// ローカルログ内のスナップショット地点までのエントリは、消費されたものとして扱われる.
244    pub fn record_snapshot_loaded(&mut self, snapshot: &LogPrefix) -> Result<()> {
245        if self.consumed_tail.index < snapshot.tail.index {
246            track_assert!(
247                snapshot.tail.index <= self.committed_tail.index,
248                ErrorKind::InconsistentState,
249                "snapshot.tail.index={:?}, self.committed_tail.index={:?}",
250                snapshot.tail.index,
251                self.committed_tail.index
252            );
253            self.consumed_tail = snapshot.tail;
254        }
255        Ok(())
256    }
257}
258
259/// `LogHistory`に保持されるレコード.
260#[derive(Debug, Clone)]
261pub struct HistoryRecord {
262    /// 記録地点.
263    pub head: LogPosition,
264
265    /// 記録時のクラスタ構成.
266    pub config: ClusterConfig,
267}
268impl HistoryRecord {
269    fn new(head: LogPosition, config: ClusterConfig) -> Self {
270        HistoryRecord { head, config }
271    }
272}