raft_engine/
lib.rs

1// Copyright (c) 2017-present, PingCAP, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! # Raft Engine
15
16#![cfg_attr(feature = "nightly", feature(test))]
17#![cfg_attr(feature = "swap", feature(allocator_api))]
18#![cfg_attr(feature = "swap", feature(slice_ptr_get))]
19// Though the new nightly rust stablized this feature, keep it anyway
20// because some other project (like TiKV) is still using the old.
21#![cfg_attr(feature = "swap", feature(nonnull_slice_from_raw_parts))]
22#![cfg_attr(feature = "swap", feature(slice_ptr_len))]
23#![cfg_attr(feature = "swap", feature(alloc_layout_extra))]
24#![cfg_attr(all(test, feature = "swap"), feature(alloc_error_hook))]
25#![cfg_attr(all(test, feature = "swap"), feature(cfg_sanitize))]
26
27#[macro_use]
28extern crate lazy_static;
29extern crate scopeguard;
30#[cfg(feature = "nightly")]
31extern crate test;
32
33macro_rules! box_err {
34    ($e:expr) => ({
35        use std::error::Error;
36        let e: Box<dyn Error + Send + Sync> = format!("[{}:{}]: {}", file!(), line!(),  $e).into();
37        e.into()
38    });
39    ($f:tt, $($arg:expr),+) => ({
40        box_err!(format!($f, $($arg),+))
41    });
42}
43
44mod codec;
45mod config;
46mod consistency;
47mod engine;
48mod errors;
49mod event_listener;
50mod file_pipe_log;
51#[cfg(feature = "scripting")]
52mod filter;
53mod fork;
54mod log_batch;
55mod memtable;
56mod metrics;
57mod pipe_log;
58mod purge;
59#[cfg(feature = "swap")]
60mod swappy_allocator;
61#[cfg(test)]
62mod test_util;
63mod util;
64mod write_barrier;
65
66pub mod env;
67
68pub use config::{Config, RecoveryMode};
69pub use engine::Engine;
70pub use errors::{Error, Result};
71pub use log_batch::{Command, LogBatch, MessageExt};
72pub use metrics::{get_perf_context, set_perf_context, take_perf_context, PerfContext};
73pub use pipe_log::Version;
74pub use util::ReadableSize;
75
76#[cfg(feature = "internals")]
77pub mod internals {
78    //! A selective view of key components in Raft Engine. Exported under the
79    //! `internals` feature only.
80    pub use crate::event_listener::*;
81    pub use crate::file_pipe_log::*;
82    pub use crate::memtable::*;
83    pub use crate::pipe_log::*;
84    pub use crate::purge::*;
85    #[cfg(feature = "swap")]
86    pub use crate::swappy_allocator::*;
87    pub use crate::write_barrier::*;
88}
89
90use std::sync::atomic::{AtomicUsize, Ordering};
91
92#[derive(Default)]
93pub struct GlobalStats {
94    live_append_entries: AtomicUsize,
95    rewrite_entries: AtomicUsize,
96    deleted_rewrite_entries: AtomicUsize,
97}
98
99impl GlobalStats {
100    #[inline]
101    pub fn add(&self, queue: pipe_log::LogQueue, count: usize) {
102        match queue {
103            pipe_log::LogQueue::Append => {
104                self.live_append_entries.fetch_add(count, Ordering::Relaxed);
105            }
106            pipe_log::LogQueue::Rewrite => {
107                self.rewrite_entries.fetch_add(count, Ordering::Relaxed);
108            }
109        }
110    }
111
112    #[inline]
113    pub fn delete(&self, queue: pipe_log::LogQueue, count: usize) {
114        match queue {
115            pipe_log::LogQueue::Append => {
116                self.live_append_entries.fetch_sub(count, Ordering::Relaxed);
117            }
118            pipe_log::LogQueue::Rewrite => {
119                self.deleted_rewrite_entries
120                    .fetch_add(count, Ordering::Relaxed);
121            }
122        }
123    }
124
125    #[inline]
126    pub fn rewrite_entries(&self) -> usize {
127        self.rewrite_entries.load(Ordering::Relaxed)
128    }
129
130    #[inline]
131    pub fn deleted_rewrite_entries(&self) -> usize {
132        self.deleted_rewrite_entries.load(Ordering::Relaxed)
133    }
134
135    #[inline]
136    pub fn reset_rewrite_counters(&self) {
137        let dop = self.deleted_rewrite_entries.load(Ordering::Relaxed);
138        self.deleted_rewrite_entries
139            .fetch_sub(dop, Ordering::Relaxed);
140        self.rewrite_entries.fetch_sub(dop, Ordering::Relaxed);
141    }
142
143    #[inline]
144    pub fn live_entries(&self, queue: pipe_log::LogQueue) -> usize {
145        match queue {
146            pipe_log::LogQueue::Append => self.live_append_entries.load(Ordering::Relaxed),
147            pipe_log::LogQueue::Rewrite => {
148                let op = self.rewrite_entries.load(Ordering::Relaxed);
149                let dop = self.deleted_rewrite_entries.load(Ordering::Relaxed);
150                debug_assert!(op >= dop);
151                op.saturating_sub(dop)
152            }
153        }
154    }
155
156    #[inline]
157    pub fn flush_metrics(&self) {
158        metrics::LOG_ENTRY_COUNT
159            .rewrite
160            .set(self.live_entries(pipe_log::LogQueue::Rewrite) as i64);
161        metrics::LOG_ENTRY_COUNT
162            .append
163            .set(self.live_entries(pipe_log::LogQueue::Append) as i64);
164    }
165}
166
167pub(crate) const INTERNAL_KEY_PREFIX: &[u8] = b"__";
168
169#[inline]
170#[cfg(test)]
171pub(crate) fn make_internal_key(k: &[u8]) -> Vec<u8> {
172    assert!(!k.is_empty());
173    let mut v = INTERNAL_KEY_PREFIX.to_vec();
174    v.extend_from_slice(k);
175    v
176}
177
178#[cfg(not(test))]
179pub(crate) fn make_internal_key(k: &[u8]) -> Vec<u8> {
180    use log_batch::ATOMIC_GROUP_KEY;
181
182    assert!(k == ATOMIC_GROUP_KEY);
183    let mut v = INTERNAL_KEY_PREFIX.to_vec();
184    v.extend_from_slice(k);
185    v
186}
187
188/// We ensure internal keys are not visible to the user by:
189/// (1) Writing internal keys will be rejected by `LogBatch::put`.
190/// (2) Internal keys are filtered out during apply and replay of both queues.
191/// This also makes sure future internal keys under the prefix won't become
192/// visible after downgrading.
193#[inline]
194#[cfg(test)]
195pub(crate) fn is_internal_key(s: &[u8], ext: Option<&[u8]>) -> bool {
196    if let Some(ext) = ext {
197        s.len() == INTERNAL_KEY_PREFIX.len() + ext.len()
198            && s[..INTERNAL_KEY_PREFIX.len()] == *INTERNAL_KEY_PREFIX
199            && s[INTERNAL_KEY_PREFIX.len()..] == *ext
200    } else {
201        s.len() > INTERNAL_KEY_PREFIX.len()
202            && s[..INTERNAL_KEY_PREFIX.len()] == *INTERNAL_KEY_PREFIX
203    }
204}
205
206#[inline]
207#[cfg(not(test))]
208pub(crate) fn is_internal_key(s: &[u8], ext: Option<&[u8]>) -> bool {
209    use log_batch::ATOMIC_GROUP_KEY;
210
211    if let Some(ext) = ext {
212        s.len() == INTERNAL_KEY_PREFIX.len() + ext.len()
213            && s[..INTERNAL_KEY_PREFIX.len()] == *INTERNAL_KEY_PREFIX
214            && s[INTERNAL_KEY_PREFIX.len()..] == *ext
215    } else {
216        is_internal_key(s, Some(ATOMIC_GROUP_KEY))
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use crate::log_batch::MessageExt;
223    use raft::eraftpb::Entry;
224
225    #[ctor::ctor]
226    fn init() {
227        env_logger::init();
228    }
229
230    impl MessageExt for Entry {
231        type Entry = Entry;
232
233        fn index(e: &Self::Entry) -> u64 {
234            e.index
235        }
236    }
237
238    #[test]
239    fn test_internal_key() {
240        let key = crate::make_internal_key(&[0]);
241        assert!(crate::is_internal_key(&key, None));
242        assert!(crate::is_internal_key(&key, Some(&[0])));
243        assert!(!crate::is_internal_key(&key, Some(&[1])));
244    }
245}