#![cfg_attr(feature = "nightly", feature(test))]
#![cfg_attr(feature = "swap", feature(allocator_api))]
#![cfg_attr(feature = "swap", feature(slice_ptr_get))]
#![cfg_attr(feature = "swap", feature(nonnull_slice_from_raw_parts))]
#![cfg_attr(feature = "swap", feature(slice_ptr_len))]
#![cfg_attr(feature = "swap", feature(alloc_layout_extra))]
#![cfg_attr(all(test, feature = "swap"), feature(alloc_error_hook))]
#![cfg_attr(all(test, feature = "swap"), feature(cfg_sanitize))]
#[macro_use]
extern crate lazy_static;
extern crate scopeguard;
#[cfg(feature = "nightly")]
extern crate test;
macro_rules! box_err {
($e:expr) => ({
use std::error::Error;
let e: Box<dyn Error + Send + Sync> = format!("[{}:{}]: {}", file!(), line!(), $e).into();
e.into()
});
($f:tt, $($arg:expr),+) => ({
box_err!(format!($f, $($arg),+))
});
}
mod codec;
mod config;
mod consistency;
mod engine;
mod errors;
mod event_listener;
mod file_pipe_log;
#[cfg(feature = "scripting")]
mod filter;
mod fork;
mod log_batch;
mod memtable;
mod metrics;
mod pipe_log;
mod purge;
#[cfg(feature = "swap")]
mod swappy_allocator;
#[cfg(test)]
mod test_util;
mod util;
mod write_barrier;
pub mod env;
pub use config::{Config, RecoveryMode};
pub use engine::Engine;
pub use errors::{Error, Result};
pub use log_batch::{Command, LogBatch, MessageExt};
pub use metrics::{get_perf_context, set_perf_context, take_perf_context, PerfContext};
pub use pipe_log::Version;
pub use util::ReadableSize;
#[cfg(feature = "internals")]
pub mod internals {
pub use crate::event_listener::*;
pub use crate::file_pipe_log::*;
pub use crate::memtable::*;
pub use crate::pipe_log::*;
pub use crate::purge::*;
#[cfg(feature = "swap")]
pub use crate::swappy_allocator::*;
pub use crate::write_barrier::*;
}
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Default)]
pub struct GlobalStats {
live_append_entries: AtomicUsize,
rewrite_entries: AtomicUsize,
deleted_rewrite_entries: AtomicUsize,
}
impl GlobalStats {
#[inline]
pub fn add(&self, queue: pipe_log::LogQueue, count: usize) {
match queue {
pipe_log::LogQueue::Append => {
self.live_append_entries.fetch_add(count, Ordering::Relaxed);
}
pipe_log::LogQueue::Rewrite => {
self.rewrite_entries.fetch_add(count, Ordering::Relaxed);
}
}
}
#[inline]
pub fn delete(&self, queue: pipe_log::LogQueue, count: usize) {
match queue {
pipe_log::LogQueue::Append => {
self.live_append_entries.fetch_sub(count, Ordering::Relaxed);
}
pipe_log::LogQueue::Rewrite => {
self.deleted_rewrite_entries
.fetch_add(count, Ordering::Relaxed);
}
}
}
#[inline]
pub fn rewrite_entries(&self) -> usize {
self.rewrite_entries.load(Ordering::Relaxed)
}
#[inline]
pub fn deleted_rewrite_entries(&self) -> usize {
self.deleted_rewrite_entries.load(Ordering::Relaxed)
}
#[inline]
pub fn reset_rewrite_counters(&self) {
let dop = self.deleted_rewrite_entries.load(Ordering::Relaxed);
self.deleted_rewrite_entries
.fetch_sub(dop, Ordering::Relaxed);
self.rewrite_entries.fetch_sub(dop, Ordering::Relaxed);
}
#[inline]
pub fn live_entries(&self, queue: pipe_log::LogQueue) -> usize {
match queue {
pipe_log::LogQueue::Append => self.live_append_entries.load(Ordering::Relaxed),
pipe_log::LogQueue::Rewrite => {
let op = self.rewrite_entries.load(Ordering::Relaxed);
let dop = self.deleted_rewrite_entries.load(Ordering::Relaxed);
debug_assert!(op >= dop);
op.saturating_sub(dop)
}
}
}
#[inline]
pub fn flush_metrics(&self) {
metrics::LOG_ENTRY_COUNT
.rewrite
.set(self.live_entries(pipe_log::LogQueue::Rewrite) as i64);
metrics::LOG_ENTRY_COUNT
.append
.set(self.live_entries(pipe_log::LogQueue::Append) as i64);
}
}
pub(crate) const INTERNAL_KEY_PREFIX: &[u8] = b"__";
#[inline]
#[cfg(test)]
pub(crate) fn make_internal_key(k: &[u8]) -> Vec<u8> {
assert!(!k.is_empty());
let mut v = INTERNAL_KEY_PREFIX.to_vec();
v.extend_from_slice(k);
v
}
#[cfg(not(test))]
pub(crate) fn make_internal_key(k: &[u8]) -> Vec<u8> {
use log_batch::ATOMIC_GROUP_KEY;
assert!(k == ATOMIC_GROUP_KEY);
let mut v = INTERNAL_KEY_PREFIX.to_vec();
v.extend_from_slice(k);
v
}
#[inline]
#[cfg(test)]
pub(crate) fn is_internal_key(s: &[u8], ext: Option<&[u8]>) -> bool {
if let Some(ext) = ext {
s.len() == INTERNAL_KEY_PREFIX.len() + ext.len()
&& s[..INTERNAL_KEY_PREFIX.len()] == *INTERNAL_KEY_PREFIX
&& s[INTERNAL_KEY_PREFIX.len()..] == *ext
} else {
s.len() > INTERNAL_KEY_PREFIX.len()
&& s[..INTERNAL_KEY_PREFIX.len()] == *INTERNAL_KEY_PREFIX
}
}
#[inline]
#[cfg(not(test))]
pub(crate) fn is_internal_key(s: &[u8], ext: Option<&[u8]>) -> bool {
use log_batch::ATOMIC_GROUP_KEY;
if let Some(ext) = ext {
s.len() == INTERNAL_KEY_PREFIX.len() + ext.len()
&& s[..INTERNAL_KEY_PREFIX.len()] == *INTERNAL_KEY_PREFIX
&& s[INTERNAL_KEY_PREFIX.len()..] == *ext
} else {
is_internal_key(s, Some(ATOMIC_GROUP_KEY))
}
}
#[cfg(test)]
mod tests {
use crate::log_batch::MessageExt;
use raft::eraftpb::Entry;
#[ctor::ctor]
fn init() {
env_logger::init();
}
impl MessageExt for Entry {
type Entry = Entry;
fn index(e: &Self::Entry) -> u64 {
e.index
}
}
#[test]
fn test_internal_key() {
let key = crate::make_internal_key(&[0]);
assert!(crate::is_internal_key(&key, None));
assert!(crate::is_internal_key(&key, Some(&[0])));
assert!(!crate::is_internal_key(&key, Some(&[1])));
}
}