raft/
log_unstable.rs

1//! A representation of not-yet-committed log entries and state.
2
3// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
4
5// Copyright 2015 The etcd Authors
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License at
10//
11//     http://www.apache.org/licenses/LICENSE-2.0
12//
13// Unless required by applicable law or agreed to in writing, software
14// distributed under the License is distributed on an "AS IS" BASIS,
15// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16// See the License for the specific language governing permissions and
17// limitations under the License.
18
19use crate::eraftpb::{Entry, Snapshot};
20use crate::util::entry_approximate_size;
21use slog::Logger;
22
23/// The `unstable.entries[i]` has raft log position `i+unstable.offset`.
24/// Note that `unstable.offset` may be less than the highest log
25/// position in storage; this means that the next write to storage
26/// might need to truncate the log before persisting unstable.entries.
27#[derive(Debug)]
28pub struct Unstable {
29    /// The incoming unstable snapshot, if any.
30    pub snapshot: Option<Snapshot>,
31
32    /// All entries that have not yet been written to storage.
33    pub entries: Vec<Entry>,
34
35    /// The size of entries
36    pub entries_size: usize,
37
38    /// The offset from the vector index.
39    pub offset: u64,
40
41    /// The tag to use when logging.
42    pub logger: Logger,
43}
44
45impl Unstable {
46    /// Creates a new log of unstable entries.
47    pub fn new(offset: u64, logger: Logger) -> Unstable {
48        Unstable {
49            offset,
50            snapshot: None,
51            entries: vec![],
52            entries_size: 0,
53            logger,
54        }
55    }
56
57    /// Returns the index of the first possible entry in entries
58    /// if it has a snapshot.
59    pub fn maybe_first_index(&self) -> Option<u64> {
60        self.snapshot
61            .as_ref()
62            .map(|snap| snap.get_metadata().index + 1)
63    }
64
65    /// Returns the last index if it has at least one unstable entry or snapshot.
66    pub fn maybe_last_index(&self) -> Option<u64> {
67        match self.entries.len() {
68            0 => self.snapshot.as_ref().map(|snap| snap.get_metadata().index),
69            len => Some(self.offset + len as u64 - 1),
70        }
71    }
72
73    /// Returns the term of the entry at index idx, if there is any.
74    pub fn maybe_term(&self, idx: u64) -> Option<u64> {
75        if idx < self.offset {
76            let snapshot = self.snapshot.as_ref()?;
77            let meta = snapshot.get_metadata();
78            if idx == meta.index {
79                Some(meta.term)
80            } else {
81                None
82            }
83        } else {
84            self.maybe_last_index().and_then(|last| {
85                if idx > last {
86                    return None;
87                }
88                Some(self.entries[(idx - self.offset) as usize].term)
89            })
90        }
91    }
92
93    /// Clears the unstable entries and moves the stable offset up to the
94    /// last index, if there is any.
95    pub fn stable_entries(&mut self, index: u64, term: u64) {
96        // The snapshot must be stabled before entries
97        assert!(self.snapshot.is_none());
98        if let Some(entry) = self.entries.last() {
99            if entry.get_index() != index || entry.get_term() != term {
100                fatal!(
101                    self.logger,
102                    "the last one of unstable.slice has different index {} and term {}, expect {} {}",
103                    entry.get_index(),
104                    entry.get_term(),
105                    index,
106                    term
107                );
108            }
109            self.offset = entry.get_index() + 1;
110            self.entries.clear();
111            self.entries_size = 0;
112        } else {
113            fatal!(
114                self.logger,
115                "unstable.slice is empty, expect its last one's index and term are {} and {}",
116                index,
117                term
118            );
119        }
120    }
121
122    /// Clears the unstable snapshot.
123    pub fn stable_snap(&mut self, index: u64) {
124        if let Some(snap) = &self.snapshot {
125            if snap.get_metadata().index != index {
126                fatal!(
127                    self.logger,
128                    "unstable.snap has different index {}, expect {}",
129                    snap.get_metadata().index,
130                    index
131                );
132            }
133            self.snapshot = None;
134        } else {
135            fatal!(
136                self.logger,
137                "unstable.snap is none, expect a snapshot with index {}",
138                index
139            );
140        }
141    }
142
143    /// From a given snapshot, restores the snapshot to self, but doesn't unpack.
144    pub fn restore(&mut self, snap: Snapshot) {
145        self.entries.clear();
146        self.entries_size = 0;
147        self.offset = snap.get_metadata().index + 1;
148        self.snapshot = Some(snap);
149    }
150
151    /// Append entries to unstable, truncate local block first if overlapped.
152    ///
153    /// # Panics
154    ///
155    /// Panics if truncate logs to the entry before snapshot
156    pub fn truncate_and_append(&mut self, ents: &[Entry]) {
157        let after = ents[0].index;
158        if after == self.offset + self.entries.len() as u64 {
159            // after is the next index in the self.entries, append directly
160        } else if after <= self.offset {
161            // The log is being truncated to before our current offset
162            // portion, so set the offset and replace the entries
163            self.offset = after;
164            self.entries.clear();
165            self.entries_size = 0;
166        } else {
167            // truncate to after and copy to self.entries then append
168            let off = self.offset;
169            self.must_check_outofbounds(off, after);
170            for e in &self.entries[(after - off) as usize..] {
171                self.entries_size -= entry_approximate_size(e);
172            }
173            self.entries.truncate((after - off) as usize);
174        }
175        self.entries.extend_from_slice(ents);
176        self.entries_size += ents.iter().map(entry_approximate_size).sum::<usize>();
177    }
178
179    /// Returns a slice of entries between the high and low.
180    ///
181    /// # Panics
182    ///
183    /// Panics if the `lo` or `hi` are out of bounds.
184    /// Panics if `lo > hi`.
185    pub fn slice(&self, lo: u64, hi: u64) -> &[Entry] {
186        self.must_check_outofbounds(lo, hi);
187        let l = lo as usize;
188        let h = hi as usize;
189        let off = self.offset as usize;
190        &self.entries[l - off..h - off]
191    }
192
193    /// Asserts the `hi` and `lo` values against each other and against the
194    /// entries themselves.
195    pub fn must_check_outofbounds(&self, lo: u64, hi: u64) {
196        if lo > hi {
197            fatal!(self.logger, "invalid unstable.slice {} > {}", lo, hi)
198        }
199        let upper = self.offset + self.entries.len() as u64;
200        if lo < self.offset || hi > upper {
201            fatal!(
202                self.logger,
203                "unstable.slice[{}, {}] out of bound[{}, {}]",
204                lo,
205                hi,
206                self.offset,
207                upper
208            )
209        }
210    }
211}
212
213#[cfg(test)]
214mod test {
215    use crate::eraftpb::{Entry, Snapshot, SnapshotMetadata};
216    use crate::log_unstable::Unstable;
217    use crate::util::entry_approximate_size;
218
219    fn new_entry(index: u64, term: u64) -> Entry {
220        let mut e = Entry::default();
221        e.term = term;
222        e.index = index;
223        e
224    }
225
226    fn new_snapshot(index: u64, term: u64) -> Snapshot {
227        let mut snap = Snapshot::default();
228        let mut meta = SnapshotMetadata::default();
229        meta.index = index;
230        meta.term = term;
231        snap.set_metadata(meta);
232        snap
233    }
234
235    #[test]
236    fn test_maybe_first_index() {
237        // entry, offset, snap, wok, windex,
238        let tests = vec![
239            // no snapshot
240            (Some(new_entry(5, 1)), 5, None, false, 0),
241            (None, 0, None, false, 0),
242            // has snapshot
243            (Some(new_entry(5, 1)), 5, Some(new_snapshot(4, 1)), true, 5),
244            (None, 5, Some(new_snapshot(4, 1)), true, 5),
245        ];
246
247        for (e, offset, snapshot, wok, windex) in tests {
248            let mut entries_size = 0;
249            let mut entries = vec![];
250            if let Some(entry) = e {
251                entries_size = entry_approximate_size(&entry);
252                entries = vec![entry];
253            }
254            let u = Unstable {
255                entries,
256                entries_size,
257                offset,
258                snapshot,
259                logger: crate::default_logger(),
260            };
261            let index = u.maybe_first_index();
262            match index {
263                None => assert!(!wok),
264                Some(index) => assert_eq!(index, windex),
265            }
266        }
267    }
268
269    #[test]
270    fn test_maybe_last_index() {
271        // entry, offset, snap, wok, windex,
272        let tests = vec![
273            (Some(new_entry(5, 1)), 5, None, true, 5),
274            (Some(new_entry(5, 1)), 5, Some(new_snapshot(4, 1)), true, 5),
275            // last in snapshot
276            (None, 5, Some(new_snapshot(4, 1)), true, 4),
277            // empty unstable
278            (None, 0, None, false, 0),
279        ];
280
281        for (e, offset, snapshot, wok, windex) in tests {
282            let mut entries_size = 0;
283            let mut entries = vec![];
284            if let Some(entry) = e {
285                entries_size = entry_approximate_size(&entry);
286                entries = vec![entry];
287            }
288            let u = Unstable {
289                entries,
290                entries_size,
291                offset,
292                snapshot,
293                logger: crate::default_logger(),
294            };
295            let index = u.maybe_last_index();
296            match index {
297                None => assert!(!wok),
298                Some(index) => assert_eq!(index, windex),
299            }
300        }
301    }
302
303    #[test]
304    fn test_maybe_term() {
305        // entry, offset, snap, index, wok, wterm
306        let tests = vec![
307            // term from entries
308            (Some(new_entry(5, 1)), 5, None, 5, true, 1),
309            (Some(new_entry(5, 1)), 5, None, 6, false, 0),
310            (Some(new_entry(5, 1)), 5, None, 4, false, 0),
311            (
312                Some(new_entry(5, 1)),
313                5,
314                Some(new_snapshot(4, 1)),
315                5,
316                true,
317                1,
318            ),
319            (
320                Some(new_entry(5, 1)),
321                5,
322                Some(new_snapshot(4, 1)),
323                6,
324                false,
325                0,
326            ),
327            // term from snapshot
328            (
329                Some(new_entry(5, 1)),
330                5,
331                Some(new_snapshot(4, 1)),
332                4,
333                true,
334                1,
335            ),
336            (
337                Some(new_entry(5, 1)),
338                5,
339                Some(new_snapshot(4, 1)),
340                3,
341                false,
342                0,
343            ),
344            (None, 5, Some(new_snapshot(4, 1)), 5, false, 0),
345            (None, 5, Some(new_snapshot(4, 1)), 4, true, 1),
346            (None, 0, None, 5, false, 0),
347        ];
348
349        for (e, offset, snapshot, index, wok, wterm) in tests {
350            let mut entries_size = 0;
351            let mut entries = vec![];
352            if let Some(entry) = e {
353                entries_size = entry_approximate_size(&entry);
354                entries = vec![entry];
355            }
356            let u = Unstable {
357                entries,
358                entries_size,
359                offset,
360                snapshot,
361                logger: crate::default_logger(),
362            };
363            let term = u.maybe_term(index);
364            match term {
365                None => assert!(!wok),
366                Some(term) => assert_eq!(term, wterm),
367            }
368        }
369    }
370
371    #[test]
372    fn test_restore() {
373        let mut u = Unstable {
374            entries: vec![new_entry(5, 1)],
375            entries_size: entry_approximate_size(&new_entry(5, 1)),
376            offset: 5,
377            snapshot: Some(new_snapshot(4, 1)),
378            logger: crate::default_logger(),
379        };
380
381        let s = new_snapshot(6, 2);
382        u.restore(s.clone());
383
384        assert_eq!(u.offset, s.get_metadata().index + 1);
385        assert!(u.entries.is_empty());
386        assert_eq!(u.entries_size, 0);
387        assert_eq!(u.snapshot.unwrap(), s);
388    }
389
390    #[test]
391    fn test_stable_snapshot_and_entries() {
392        let ents = vec![new_entry(5, 1), new_entry(5, 2), new_entry(6, 3)];
393        let entries_size = ents.iter().map(entry_approximate_size).sum::<usize>();
394        let mut u = Unstable {
395            entries: ents.clone(),
396            entries_size,
397            offset: 5,
398            snapshot: Some(new_snapshot(4, 1)),
399            logger: crate::default_logger(),
400        };
401        assert_eq!(ents, u.entries);
402        u.stable_snap(4);
403        u.stable_entries(6, 3);
404        assert!(u.entries.is_empty());
405        assert_eq!(u.entries_size, 0);
406        assert_eq!(u.offset, 7);
407    }
408
409    #[test]
410    fn test_truncate_and_append() {
411        // entries, offset, snap, to_append, woffset, wentries
412        let tests = vec![
413            // replace to the end
414            (
415                vec![new_entry(5, 1)],
416                5,
417                None,
418                vec![new_entry(6, 1), new_entry(7, 1)],
419                5,
420                vec![new_entry(5, 1), new_entry(6, 1), new_entry(7, 1)],
421            ),
422            // replace to unstable entries
423            (
424                vec![new_entry(5, 1)],
425                5,
426                None,
427                vec![new_entry(5, 2), new_entry(6, 2)],
428                5,
429                vec![new_entry(5, 2), new_entry(6, 2)],
430            ),
431            (
432                vec![new_entry(5, 1)],
433                5,
434                None,
435                vec![new_entry(4, 2), new_entry(5, 2), new_entry(6, 2)],
436                4,
437                vec![new_entry(4, 2), new_entry(5, 2), new_entry(6, 2)],
438            ),
439            // truncate existing entries and append
440            (
441                vec![new_entry(5, 1), new_entry(6, 1), new_entry(7, 1)],
442                5,
443                None,
444                vec![new_entry(6, 2)],
445                5,
446                vec![new_entry(5, 1), new_entry(6, 2)],
447            ),
448            (
449                vec![new_entry(5, 1), new_entry(6, 1), new_entry(7, 1)],
450                5,
451                None,
452                vec![new_entry(7, 2), new_entry(8, 2)],
453                5,
454                vec![
455                    new_entry(5, 1),
456                    new_entry(6, 1),
457                    new_entry(7, 2),
458                    new_entry(8, 2),
459                ],
460            ),
461        ];
462
463        for (entries, offset, snapshot, to_append, woffset, wentries) in tests {
464            let entries_size = entries.iter().map(entry_approximate_size).sum::<usize>();
465            let mut u = Unstable {
466                entries,
467                entries_size,
468                offset,
469                snapshot,
470                logger: crate::default_logger(),
471            };
472            u.truncate_and_append(&to_append);
473            assert_eq!(u.offset, woffset);
474            assert_eq!(u.entries, wentries);
475            let entries_size = wentries.iter().map(entry_approximate_size).sum::<usize>();
476            assert_eq!(u.entries_size, entries_size);
477        }
478    }
479}