raft/
util.rs

1//! This module contains a collection of various tools to use to manipulate
2//! and control messages and data associated with raft.
3
4// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
5
6use std::fmt;
7use std::fmt::Write;
8use std::u64;
9
10use slog::{OwnedKVList, Record, KV};
11
12use crate::eraftpb::{Entry, Message};
13use crate::HashSet;
14use protobuf::Message as PbMessage;
15
16use slog::{b, record_static};
17
18/// A number to represent that there is no limit.
19pub const NO_LIMIT: u64 = u64::MAX;
20
21/// Truncates the list of entries down to a specific byte-length of
22/// all entries together.
23///
24/// # Examples
25///
26/// ```
27/// use raft::{util::limit_size, prelude::*};
28///
29/// let template = {
30///     let mut entry = Entry::default();
31///     entry.data = "*".repeat(100).into_bytes().into();
32///     entry
33/// };
34///
35/// // Make a bunch of entries that are ~100 bytes long
36/// let mut entries = vec![
37///     template.clone(),
38///     template.clone(),
39///     template.clone(),
40///     template.clone(),
41///     template.clone(),
42/// ];
43///
44/// assert_eq!(entries.len(), 5);
45/// limit_size(&mut entries, Some(220));
46/// assert_eq!(entries.len(), 2);
47///
48/// // `entries` will always have at least 1 Message
49/// limit_size(&mut entries, Some(0));
50/// assert_eq!(entries.len(), 1);
51/// ```
52pub fn limit_size<T: PbMessage + Clone>(entries: &mut Vec<T>, max: Option<u64>) {
53    if entries.len() <= 1 {
54        return;
55    }
56    let max = match max {
57        None | Some(NO_LIMIT) => return,
58        Some(max) => max,
59    };
60
61    let mut size = 0;
62    let limit = entries
63        .iter()
64        .take_while(|&e| {
65            if size == 0 {
66                size += u64::from(e.compute_size());
67                return true;
68            }
69            size += u64::from(e.compute_size());
70            size <= max
71        })
72        .count();
73
74    entries.truncate(limit);
75}
76
77/// Check whether the entry is continuous to the message.
78/// i.e msg's next entry index should be equal to the index of the first entry in `ents`
79pub fn is_continuous_ents(msg: &Message, ents: &[Entry]) -> bool {
80    if !msg.entries.is_empty() && !ents.is_empty() {
81        let expected_next_idx = msg.entries.last().unwrap().index + 1;
82        return expected_next_idx == ents.first().unwrap().index;
83    }
84    true
85}
86
87struct FormatKeyValueList {
88    pub buffer: String,
89}
90
91impl slog::Serializer for FormatKeyValueList {
92    fn emit_arguments(&mut self, key: slog::Key, val: &fmt::Arguments) -> slog::Result {
93        if !self.buffer.is_empty() {
94            write!(&mut self.buffer, ", {}: {}", key, val).unwrap();
95        } else {
96            write!(&mut self.buffer, "{}: {}", key, val).unwrap();
97        }
98        Ok(())
99    }
100}
101
102pub(crate) fn format_kv_list(kv_list: &OwnedKVList) -> String {
103    let mut formatter = FormatKeyValueList {
104        buffer: "".to_owned(),
105    };
106    let record = record_static!(slog::Level::Trace, "");
107    kv_list
108        .serialize(
109            &Record::new(&record, &format_args!(""), b!()),
110            &mut formatter,
111        )
112        .unwrap();
113    formatter.buffer
114}
115
116/// Get the majority number of given nodes count.
117#[inline]
118pub fn majority(total: usize) -> usize {
119    (total / 2) + 1
120}
121
122/// A convenient struct that handles queries to both HashSet.
123pub struct Union<'a> {
124    first: &'a HashSet<u64>,
125    second: &'a HashSet<u64>,
126}
127
128impl<'a> Union<'a> {
129    /// Creates a union.
130    pub fn new(first: &'a HashSet<u64>, second: &'a HashSet<u64>) -> Union<'a> {
131        Union { first, second }
132    }
133
134    /// Checks if id shows up in either HashSet.
135    #[inline]
136    pub fn contains(&self, id: u64) -> bool {
137        self.first.contains(&id) || self.second.contains(&id)
138    }
139
140    /// Returns an iterator iterates the distinct values in two sets.
141    pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
142        self.first.union(self.second).cloned()
143    }
144
145    /// Checks if union is empty.
146    pub fn is_empty(&self) -> bool {
147        self.first.is_empty() && self.second.is_empty()
148    }
149
150    /// Gets the count of the union.
151    ///
152    /// The time complexity is O(n).
153    pub fn len(&self) -> usize {
154        // Usually, second is empty.
155        self.first.len() + self.second.len() - self.second.intersection(self.first).count()
156    }
157}
158
159/// Get the approximate size of entry
160#[inline]
161pub fn entry_approximate_size(e: &Entry) -> usize {
162    //  message Entry {
163    //      EntryType entry_type = 1;
164    //      uint64 term = 2;
165    //      uint64 index = 3;
166    //      bytes data = 4;
167    //      bytes context = 6;
168    //      bool sync_log = 5;(Deprecated)
169    // }
170    // Each field has tag(1 byte) if it's not default value.
171    // Tips: x bytes can represent a value up to 1 << x*7 - 1,
172    // So 1 byte => 127, 2 bytes => 16383, 3 bytes => 2097151.
173    // If entry_type is normal(default), in general, the size should
174    // be tag(4) + term(1) + index(2) + data(2) + context(1) = 10.
175    // If entry_type is conf change, in general, the size should be
176    // tag(5) + entry_type(1) + term(1) + index(2) + data(1) + context(1) = 11.
177    // We choose 12 in case of large index or large data for normal entry.
178    e.data.len() + e.context.len() + 12
179}