1#![cfg_attr(feature = "nightly", feature(test))]
17#![cfg_attr(feature = "swap", feature(allocator_api))]
18#![cfg_attr(feature = "swap", feature(slice_ptr_get))]
19#![cfg_attr(feature = "swap", feature(nonnull_slice_from_raw_parts))]
22#![cfg_attr(feature = "swap", feature(slice_ptr_len))]
23#![cfg_attr(feature = "swap", feature(alloc_layout_extra))]
24#![cfg_attr(all(test, feature = "swap"), feature(alloc_error_hook))]
25#![cfg_attr(all(test, feature = "swap"), feature(cfg_sanitize))]
26
27#[macro_use]
28extern crate lazy_static;
29extern crate scopeguard;
30#[cfg(feature = "nightly")]
31extern crate test;
32
33macro_rules! box_err {
34 ($e:expr) => ({
35 use std::error::Error;
36 let e: Box<dyn Error + Send + Sync> = format!("[{}:{}]: {}", file!(), line!(), $e).into();
37 e.into()
38 });
39 ($f:tt, $($arg:expr),+) => ({
40 box_err!(format!($f, $($arg),+))
41 });
42}
43
44mod codec;
45mod config;
46mod consistency;
47mod engine;
48mod errors;
49mod event_listener;
50mod file_pipe_log;
51#[cfg(feature = "scripting")]
52mod filter;
53mod fork;
54mod log_batch;
55mod memtable;
56mod metrics;
57mod pipe_log;
58mod purge;
59#[cfg(feature = "swap")]
60mod swappy_allocator;
61#[cfg(test)]
62mod test_util;
63mod util;
64mod write_barrier;
65
66pub mod env;
67
68pub use config::{Config, RecoveryMode};
69pub use engine::Engine;
70pub use errors::{Error, Result};
71pub use log_batch::{Command, LogBatch, MessageExt};
72pub use metrics::{get_perf_context, set_perf_context, take_perf_context, PerfContext};
73pub use pipe_log::Version;
74pub use util::ReadableSize;
75
76#[cfg(feature = "internals")]
77pub mod internals {
78 pub use crate::event_listener::*;
81 pub use crate::file_pipe_log::*;
82 pub use crate::memtable::*;
83 pub use crate::pipe_log::*;
84 pub use crate::purge::*;
85 #[cfg(feature = "swap")]
86 pub use crate::swappy_allocator::*;
87 pub use crate::write_barrier::*;
88}
89
90use std::sync::atomic::{AtomicUsize, Ordering};
91
92#[derive(Default)]
93pub struct GlobalStats {
94 live_append_entries: AtomicUsize,
95 rewrite_entries: AtomicUsize,
96 deleted_rewrite_entries: AtomicUsize,
97}
98
99impl GlobalStats {
100 #[inline]
101 pub fn add(&self, queue: pipe_log::LogQueue, count: usize) {
102 match queue {
103 pipe_log::LogQueue::Append => {
104 self.live_append_entries.fetch_add(count, Ordering::Relaxed);
105 }
106 pipe_log::LogQueue::Rewrite => {
107 self.rewrite_entries.fetch_add(count, Ordering::Relaxed);
108 }
109 }
110 }
111
112 #[inline]
113 pub fn delete(&self, queue: pipe_log::LogQueue, count: usize) {
114 match queue {
115 pipe_log::LogQueue::Append => {
116 self.live_append_entries.fetch_sub(count, Ordering::Relaxed);
117 }
118 pipe_log::LogQueue::Rewrite => {
119 self.deleted_rewrite_entries
120 .fetch_add(count, Ordering::Relaxed);
121 }
122 }
123 }
124
125 #[inline]
126 pub fn rewrite_entries(&self) -> usize {
127 self.rewrite_entries.load(Ordering::Relaxed)
128 }
129
130 #[inline]
131 pub fn deleted_rewrite_entries(&self) -> usize {
132 self.deleted_rewrite_entries.load(Ordering::Relaxed)
133 }
134
135 #[inline]
136 pub fn reset_rewrite_counters(&self) {
137 let dop = self.deleted_rewrite_entries.load(Ordering::Relaxed);
138 self.deleted_rewrite_entries
139 .fetch_sub(dop, Ordering::Relaxed);
140 self.rewrite_entries.fetch_sub(dop, Ordering::Relaxed);
141 }
142
143 #[inline]
144 pub fn live_entries(&self, queue: pipe_log::LogQueue) -> usize {
145 match queue {
146 pipe_log::LogQueue::Append => self.live_append_entries.load(Ordering::Relaxed),
147 pipe_log::LogQueue::Rewrite => {
148 let op = self.rewrite_entries.load(Ordering::Relaxed);
149 let dop = self.deleted_rewrite_entries.load(Ordering::Relaxed);
150 debug_assert!(op >= dop);
151 op.saturating_sub(dop)
152 }
153 }
154 }
155
156 #[inline]
157 pub fn flush_metrics(&self) {
158 metrics::LOG_ENTRY_COUNT
159 .rewrite
160 .set(self.live_entries(pipe_log::LogQueue::Rewrite) as i64);
161 metrics::LOG_ENTRY_COUNT
162 .append
163 .set(self.live_entries(pipe_log::LogQueue::Append) as i64);
164 }
165}
166
167pub(crate) const INTERNAL_KEY_PREFIX: &[u8] = b"__";
168
169#[inline]
170#[cfg(test)]
171pub(crate) fn make_internal_key(k: &[u8]) -> Vec<u8> {
172 assert!(!k.is_empty());
173 let mut v = INTERNAL_KEY_PREFIX.to_vec();
174 v.extend_from_slice(k);
175 v
176}
177
178#[cfg(not(test))]
179pub(crate) fn make_internal_key(k: &[u8]) -> Vec<u8> {
180 use log_batch::ATOMIC_GROUP_KEY;
181
182 assert!(k == ATOMIC_GROUP_KEY);
183 let mut v = INTERNAL_KEY_PREFIX.to_vec();
184 v.extend_from_slice(k);
185 v
186}
187
188#[inline]
194#[cfg(test)]
195pub(crate) fn is_internal_key(s: &[u8], ext: Option<&[u8]>) -> bool {
196 if let Some(ext) = ext {
197 s.len() == INTERNAL_KEY_PREFIX.len() + ext.len()
198 && s[..INTERNAL_KEY_PREFIX.len()] == *INTERNAL_KEY_PREFIX
199 && s[INTERNAL_KEY_PREFIX.len()..] == *ext
200 } else {
201 s.len() > INTERNAL_KEY_PREFIX.len()
202 && s[..INTERNAL_KEY_PREFIX.len()] == *INTERNAL_KEY_PREFIX
203 }
204}
205
206#[inline]
207#[cfg(not(test))]
208pub(crate) fn is_internal_key(s: &[u8], ext: Option<&[u8]>) -> bool {
209 use log_batch::ATOMIC_GROUP_KEY;
210
211 if let Some(ext) = ext {
212 s.len() == INTERNAL_KEY_PREFIX.len() + ext.len()
213 && s[..INTERNAL_KEY_PREFIX.len()] == *INTERNAL_KEY_PREFIX
214 && s[INTERNAL_KEY_PREFIX.len()..] == *ext
215 } else {
216 is_internal_key(s, Some(ATOMIC_GROUP_KEY))
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use crate::log_batch::MessageExt;
223 use raft::eraftpb::Entry;
224
225 #[ctor::ctor]
226 fn init() {
227 env_logger::init();
228 }
229
230 impl MessageExt for Entry {
231 type Entry = Entry;
232
233 fn index(e: &Self::Entry) -> u64 {
234 e.index
235 }
236 }
237
238 #[test]
239 fn test_internal_key() {
240 let key = crate::make_internal_key(&[0]);
241 assert!(crate::is_internal_key(&key, None));
242 assert!(crate::is_internal_key(&key, Some(&[0])));
243 assert!(!crate::is_internal_key(&key, Some(&[1])));
244 }
245}