#![allow(unsafe_code)]
pub mod constants;
pub mod logger;
mod blob_io;
mod disk_pointer;
mod iobuf;
mod iterator;
mod parallel_io;
mod reader;
mod reservation;
mod segment;
mod snapshot;
use crate::*;
use std::{borrow::Cow, collections::BinaryHeap, ops::Deref};
use self::{
blob_io::{gc_blobs, read_blob, remove_blob, write_blob},
constants::{
BATCH_MANIFEST_PID, CONFIG_PID, COUNTER_PID, META_PID,
PAGE_CONSOLIDATION_THRESHOLD,
},
iobuf::{IoBuf, IoBufs},
iterator::{raw_segment_iter_from, LogIter},
parallel_io::Pio,
segment::SegmentAccountant,
snapshot::advance_snapshot,
};
pub(crate) use self::{
logger::{MessageHeader, SegmentHeader},
reader::{read_message, read_segment_header},
reservation::Reservation,
snapshot::{read_snapshot_or_default, PageState, Snapshot},
};
pub use self::{
constants::{
BATCH_MANIFEST_INLINE_LEN, BLOB_INLINE_LEN, MAX_SPACE_AMPLIFICATION,
MINIMUM_ITEMS_PER_SEGMENT, MSG_HEADER_LEN, SEG_HEADER_LEN,
},
disk_pointer::DiskPtr,
logger::{Log, LogRead},
segment::SegmentMode,
};
pub type SegmentId = usize;
pub type LogOffset = u64;
pub type BlobPointer = Lsn;
pub type Lsn = i64;
pub type PageId = u64;
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
#[repr(u8)]
pub enum MessageKind {
Corrupted = 0,
Cancelled = 1,
Pad = 2,
BatchManifest = 3,
Free = 4,
Counter = 5,
InlineMeta = 6,
BlobMeta = 7,
InlineConfig = 8,
BlobConfig = 9,
InlineReplace = 10,
BlobReplace = 11,
InlineAppend = 12,
BlobAppend = 13,
}
impl MessageKind {
pub(crate) const fn into(self) -> u8 {
self as u8
}
}
impl From<u8> for MessageKind {
fn from(byte: u8) -> Self {
use MessageKind::*;
match byte {
0 => Corrupted,
1 => Cancelled,
2 => Pad,
3 => BatchManifest,
4 => Free,
5 => Counter,
6 => InlineMeta,
7 => BlobMeta,
8 => InlineConfig,
9 => BlobConfig,
10 => InlineReplace,
11 => BlobReplace,
12 => InlineAppend,
13 => BlobAppend,
other => {
debug!("encountered unexpected message kind byte {}", other);
Corrupted
}
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum LogKind {
Replace,
Append,
Free,
Skip,
Corrupted,
}
fn log_kind_from_update(update: &Update) -> LogKind {
use Update::*;
match update {
Free => LogKind::Free,
Append(..) => LogKind::Append,
Compact(..) | Counter(..) | Meta(..) | Config(..) => LogKind::Replace,
}
}
impl From<MessageKind> for LogKind {
fn from(kind: MessageKind) -> Self {
use MessageKind::*;
match kind {
Free => LogKind::Free,
InlineReplace | Counter | BlobReplace | InlineMeta | BlobMeta
| InlineConfig | BlobConfig => LogKind::Replace,
InlineAppend | BlobAppend => LogKind::Append,
Cancelled | Pad | BatchManifest => LogKind::Skip,
other => {
debug!("encountered unexpected message kind byte {:?}", other);
LogKind::Corrupted
}
}
}
}
fn assert_usize<T>(from: T) -> usize
where
usize: TryFrom<T, Error = std::num::TryFromIntError>,
{
usize::try_from(from).expect("lost data cast while converting to usize")
}
fn bump_atomic_lsn(atomic_lsn: &AtomicLsn, to: Lsn) {
let mut current = atomic_lsn.load(SeqCst);
loop {
if current >= to {
return;
}
let last = atomic_lsn.compare_and_swap(current, to, SeqCst);
if last == current {
return;
}
current = last;
}
}
use std::convert::{TryFrom, TryInto};
#[cfg(feature = "compression")]
use zstd::block::decompress;
#[inline]
pub(crate) fn lsn_to_arr(number: Lsn) -> [u8; 8] {
number.to_le_bytes()
}
#[inline]
pub(crate) fn arr_to_lsn(arr: &[u8]) -> Lsn {
arr.try_into().map(Lsn::from_le_bytes).unwrap()
}
#[inline]
pub(crate) fn u64_to_arr(number: u64) -> [u8; 8] {
number.to_le_bytes()
}
#[inline]
pub(crate) fn arr_to_u64(arr: &[u8]) -> u64 {
arr.try_into().map(u64::from_le_bytes).unwrap()
}
#[inline]
pub(crate) fn arr_to_u32(arr: &[u8]) -> u32 {
arr.try_into().map(u32::from_le_bytes).unwrap()
}
#[inline]
pub(crate) fn u32_to_arr(number: u32) -> [u8; 4] {
number.to_le_bytes()
}
#[allow(clippy::needless_pass_by_value)]
pub(crate) fn maybe_decompress(buf: Vec<u8>) -> std::io::Result<Vec<u8>> {
#[cfg(feature = "compression")]
{
use super::*;
static MAX_COMPRESSION_RATIO: AtomicUsize = AtomicUsize::new(1);
let _measure = Measure::new(&M.decompress);
loop {
let ratio = MAX_COMPRESSION_RATIO.load(Acquire);
match decompress(&*buf, buf.len() * ratio) {
Err(ref e) if e.kind() == std::io::ErrorKind::Other => {
debug!(
"bumping expected compression \
ratio up from {} to {}: {:?}",
ratio,
ratio + 1,
e
);
let _who_cares = MAX_COMPRESSION_RATIO.compare_and_swap(
ratio,
ratio + 1,
Release,
);
}
other => return other,
}
}
}
#[cfg(not(feature = "compression"))]
Ok(buf)
}
type PagePtrInner<'g> = Shared<'g, stack::Node<(Option<Update>, CacheInfo)>>;
#[derive(Debug, Clone, PartialEq, Copy)]
pub struct PagePtr<'g> {
cached_pointer: PagePtrInner<'g>,
ts: u64,
}
impl<'g> PagePtr<'g> {
pub fn last_lsn(&self) -> Lsn {
unsafe { self.cached_pointer.deref().deref().1.lsn }
}
}
unsafe impl<'g> Send for PagePtr<'g> {}
unsafe impl<'g> Sync for PagePtr<'g> {}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct CacheInfo {
pub ts: u64,
pub lsn: Lsn,
pub pointer: DiskPtr,
pub log_size: usize,
}
#[derive(Clone, Debug, PartialEq)]
pub(crate) enum Update {
Append(Frag),
Compact(Frag),
Free,
Counter(u64),
Meta(Meta),
Config(StorageParameters),
}
impl Update {
fn into_frag(self) -> Frag {
match self {
Update::Append(frag) | Update::Compact(frag) => frag,
other => {
panic!("called into_frag on non-Append/Compact: {:?}", other)
}
}
}
fn as_frag(&self) -> &Frag {
match self {
Update::Append(frag) | Update::Compact(frag) => frag,
other => {
panic!("called as_frag on non-Append/Compact: {:?}", other)
}
}
}
fn is_compact(&self) -> bool {
if let Update::Compact(_) = self { true } else { false }
}
fn is_free(&self) -> bool {
if let Update::Free = self { true } else { false }
}
}
pub struct RecoveryGuard<'a> {
batch_res: Reservation<'a>,
}
impl<'a> RecoveryGuard<'a> {
pub fn seal_batch(mut self) -> Result<()> {
let max_reserved =
self.batch_res.log.iobufs.max_reserved_lsn.load(Acquire);
self.batch_res.mark_writebatch(max_reserved);
self.batch_res.complete().map(|_| ())
}
}
pub struct PageCache {
config: RunningConfig,
inner: PageTable<Page>,
next_pid_to_allocate: AtomicU64,
free: Arc<Mutex<BinaryHeap<PageId>>>,
log: Log,
lru: Lru,
updates: AtomicU64,
last_snapshot: Arc<Mutex<Option<Snapshot>>>,
idgen: Arc<AtomicU64>,
idgen_persists: Arc<AtomicU64>,
idgen_persist_mu: Arc<Mutex<()>>,
was_recovered: bool,
}
type Page = Stack<(Option<Update>, CacheInfo)>;
unsafe impl Send for PageCache {}
unsafe impl Sync for PageCache {}
impl Debug for PageCache {
fn fmt(
&self,
f: &mut fmt::Formatter<'_>,
) -> std::result::Result<(), fmt::Error> {
f.write_str(&*format!(
"PageCache {{ max: {:?} free: {:?} }}\n",
self.next_pid_to_allocate.load(Acquire),
self.free
))
}
}
#[cfg(feature = "event_log")]
impl Drop for PageCache {
fn drop(&mut self) {
use std::collections::HashMap;
trace!("dropping pagecache");
if self.log.iobufs.config.global_error().is_ok() {
let mut pages_before_restart = HashMap::new();
let guard = pin();
self.config.event_log.meta_before_restart(
self.meta(&guard).expect("should get meta under test").clone(),
);
for pid in 0..self.next_pid_to_allocate.load(Acquire) {
let pte = self.inner.get(pid);
if pte.is_none() {
continue;
}
let head = pte.unwrap().deref().head(&guard);
let pointers = pointers_from_stack(head, &guard);
pages_before_restart.insert(pid, pointers);
}
self.config.event_log.pages_before_restart(pages_before_restart);
}
trace!("pagecache dropped");
}
}
impl PageCache {
pub fn start(config: RunningConfig) -> Result<Self> {
trace!("starting pagecache");
config.reset_global_error();
let snapshot = read_snapshot_or_default(&config)?;
let cache_capacity = config.cache_capacity;
let lru = Lru::new(cache_capacity);
let mut pc = Self {
config: config.clone(),
inner: PageTable::default(),
next_pid_to_allocate: AtomicU64::new(0),
free: Arc::new(Mutex::new(BinaryHeap::new())),
log: Log::start(config, &snapshot)?,
lru,
updates: AtomicU64::new(0),
last_snapshot: Arc::new(Mutex::new(Some(snapshot))),
idgen_persist_mu: Arc::new(Mutex::new(())),
idgen: Arc::new(AtomicU64::new(0)),
idgen_persists: Arc::new(AtomicU64::new(0)),
was_recovered: false,
};
pc.load_snapshot();
#[cfg(feature = "event_log")]
{
use std::collections::HashMap;
let guard = pin();
let mut pages_after_restart = HashMap::new();
for pid in 0..pc.next_pid_to_allocate.load(Acquire) {
let pte = pc.inner.get(pid);
if pte.is_none() {
continue;
}
let head = pte.unwrap().deref().head(&guard);
let pointers = pointers_from_stack(head, &guard);
pages_after_restart.insert(pid, pointers);
}
pc.config.event_log.pages_after_restart(pages_after_restart);
}
let mut was_recovered = true;
{
let guard = pin();
if let Err(Error::ReportableBug(..)) = pc.get_meta(&guard) {
was_recovered = false;
let meta_update = Update::Meta(Meta::default());
let (meta_id, _) = pc.allocate_inner(meta_update, &guard)?;
assert_eq!(
meta_id, META_PID,
"we expect the meta page to have pid {}, but it had pid {} instead",
META_PID, meta_id,
);
}
if let Err(Error::ReportableBug(..)) = pc.get_idgen(&guard) {
was_recovered = false;
let counter_update = Update::Counter(0);
let (counter_id, _) =
pc.allocate_inner(counter_update, &guard)?;
assert_eq!(
counter_id, COUNTER_PID,
"we expect the counter to have pid {}, but it had pid {} instead",
COUNTER_PID, counter_id,
);
}
if let Err(Error::ReportableBug(..)) =
pc.get_persisted_config(&guard)
{
was_recovered = false;
let config_update = Update::Config(StorageParameters {
segment_size: pc.config.segment_size,
use_compression: pc.config.use_compression,
version: pc.config.version,
});
let (config_id, _) =
pc.allocate_inner(config_update, &guard)?;
assert_eq!(
config_id, CONFIG_PID,
"we expect the counter to have pid {}, but it had pid {} instead",
CONFIG_PID, config_id,
);
}
let (_, counter) = pc.get_idgen(&guard)?;
let idgen_recovery = if was_recovered {
counter + (2 * pc.config.idgen_persist_interval)
} else {
0
};
let idgen_persists = counter / pc.config.idgen_persist_interval
* pc.config.idgen_persist_interval;
pc.idgen.store(idgen_recovery, Release);
pc.idgen_persists.store(idgen_persists, Release);
}
pc.was_recovered = was_recovered;
#[cfg(feature = "event_log")]
{
let guard = pin();
pc.config.event_log.meta_after_restart(
pc.meta(&guard)
.expect("should be able to get meta under test")
.clone(),
);
}
trace!("pagecache started");
Ok(pc)
}
pub fn flush(&self) -> Result<usize> {
self.log.flush()
}
pub fn allocate<'g>(
&self,
new: Frag,
guard: &'g Guard,
) -> Result<(PageId, PagePtr<'g>)> {
self.allocate_inner(Update::Compact(new), guard)
}
pub fn attempt_gc(&self) -> Result<bool> {
if self.config.read_only {
return Ok(false);
}
let guard = pin();
let to_clean = self.log.with_sa(|sa| sa.clean(COUNTER_PID));
let ret = if let Some(to_clean) = to_clean {
self.rewrite_page(to_clean, &guard).map(|_| true)
} else {
Ok(false)
};
guard.flush();
ret
}
pub fn pin_log(&self) -> Result<RecoveryGuard<'_>> {
let batch_res = self.log.reserve(
LogKind::Skip,
BATCH_MANIFEST_PID,
&[0; std::mem::size_of::<Lsn>()],
)?;
Ok(RecoveryGuard { batch_res })
}
#[doc(hidden)]
#[cfg(feature = "failpoints")]
pub fn set_failpoint(&self, e: Error) {
if let Error::FailPoint = e {
self.config.set_global_error(e);
let _ = self.log.iobufs.intervals.lock();
let _notified = self.log.iobufs.interval_updated.notify_all();
}
}
fn allocate_inner<'g>(
&self,
new: Update,
guard: &'g Guard,
) -> Result<(PageId, PagePtr<'g>)> {
let (pid, key) = if let Some(pid) = self.free.lock().pop() {
trace!("re-allocating pid {}", pid);
let stack = match self.inner.get(pid) {
None => panic!(
"expected to find existing stack \
for re-allocated pid {}",
pid
),
Some(p) => p,
};
let head = stack.head(guard);
let mut stack_iter = StackIter::from_ptr(head, guard);
let next_res = stack_iter.next();
if let Some((Some(Update::Free), cache_info)) = next_res {
(pid, PagePtr { cached_pointer: head, ts: cache_info.ts })
} else {
panic!(
"failed to re-allocate pid {} which \
contained unexpected state {:?}",
pid, next_res
)
}
} else {
let pid = self.next_pid_to_allocate.fetch_add(1, Relaxed);
trace!("allocating pid {} for the first time", pid);
let new_stack = Stack::default();
self.inner.insert(pid, new_stack);
(pid, PagePtr { cached_pointer: Shared::null(), ts: 0 })
};
let new_pointer =
self.cas_page(pid, key, new, false, guard)?.unwrap_or_else(|e| {
panic!(
"should always be able to install \
a new page during allocation, but \
failed for pid {}: {:?}",
pid, e
)
});
Ok((pid, new_pointer))
}
pub fn free<'g>(
&self,
pid: PageId,
old: PagePtr<'g>,
guard: &'g Guard,
) -> Result<CasResult<'g, ()>> {
trace!("attempting to free pid {}", pid);
if pid == COUNTER_PID
|| pid == META_PID
|| pid == CONFIG_PID
|| pid == BATCH_MANIFEST_PID
{
return Err(Error::Unsupported(
"you are not able to free the first \
couple pages, which are allocated \
for system internal purposes"
.into(),
));
}
let new_pointer =
self.cas_page(pid, old, Update::Free, false, guard)?;
if new_pointer.is_ok() {
let free = self.free.clone();
guard.defer(move || {
let mut free = free.lock();
if free.iter().any(|e| e == &pid) {
panic!("pid {} was double-freed", pid);
}
free.push(pid);
});
}
Ok(new_pointer.map_err(|o| o.map(|(pointer, _)| (pointer, ()))))
}
pub fn link<'g>(
&'g self,
pid: PageId,
mut old: PagePtr<'g>,
new: Frag,
guard: &'g Guard,
) -> Result<CasResult<'g, Frag>> {
let _measure = Measure::new(&M.link_page);
trace!("linking pid {} with {:?}", pid, new);
#[cfg(any(test, feature = "lock_free_delays"))]
{
use std::cell::RefCell;
use std::time::{SystemTime, UNIX_EPOCH};
thread_local! {
pub static COUNT: RefCell<u32> = RefCell::new(1);
}
let time_now =
SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
#[allow(clippy::cast_possible_truncation)]
let fail_seed = std::cmp::max(3, time_now.as_nanos() as u32 % 128);
let inject_failure = COUNT.with(|c| {
let mut cr = c.borrow_mut();
*cr += 1;
*cr % fail_seed == 0
});
if inject_failure {
debug!(
"injecting a randomized failure in the link of pid {}",
pid
);
if let Some((current_pointer, _frag, _sz)) =
self.get(pid, guard)?
{
return Ok(Err(Some((current_pointer, new))));
} else {
return Ok(Err(None));
}
}
}
let stack = match self.inner.get(pid) {
None => return Ok(Err(None)),
Some(p) => p,
};
let head = stack.head(guard);
let stack_iter = StackIter::from_ptr(head, guard);
let stack_len = stack_iter.size_hint().1.unwrap();
if stack_len >= PAGE_CONSOLIDATION_THRESHOLD {
let current_frag = if let Some((current_pointer, frag, _sz)) =
self.get(pid, guard)?
{
if old.ts != current_pointer.ts {
assert!(
old.cached_pointer != current_pointer.cached_pointer
);
return Ok(Err(Some((current_pointer, new))));
}
frag
} else {
return Ok(Err(None));
};
let update: Frag = {
let _measure = Measure::new(&M.merge_page);
let mut update = current_frag.clone();
update.merge(&new);
update
};
return self.replace(pid, old, update, guard);
}
let bytes = measure(&M.serialize, || serialize(&new).unwrap());
let mut new = {
let update = Update::Append(new);
let cache_info = CacheInfo {
lsn: -1,
pointer: DiskPtr::Inline(666_666_666),
ts: 0,
log_size: 0,
};
let node = stack::Node {
inner: (Some(update), cache_info),
next: Atomic::null(),
};
Some(Owned::new(node))
};
loop {
let log_reservation =
self.log.reserve(LogKind::Append, pid, &bytes)?;
let lsn = log_reservation.lsn();
let pointer = log_reservation.pointer();
let ts = old.ts + 1;
let mut node = new.take().unwrap();
let cache_info = CacheInfo {
lsn,
pointer,
ts,
log_size: log_reservation.reservation_len(),
};
if let (Some(Update::Append(_)), ref mut stored_cache_info) =
node.inner
{
*stored_cache_info = cache_info;
} else {
panic!("should only be working with Resident entries");
}
debug_delay();
let result = stack.cap_node(old.cached_pointer, node, guard);
match result {
Ok(cached_pointer) => {
trace!("link of pid {} succeeded", pid);
let previous_head_lsn = old.last_lsn();
assert_ne!(previous_head_lsn, 0);
let previous_lsn_segment =
previous_head_lsn / self.config.segment_size as i64;
let new_lsn_segment = lsn / self.config.segment_size as i64;
let to_clean = if previous_lsn_segment == new_lsn_segment {
self.log.with_sa(|sa| sa.clean(pid))
} else {
self.log.with_sa(|sa| {
sa.mark_link(pid, lsn, pointer);
sa.clean(pid)
})
};
log_reservation.complete()?;
if let Some(to_clean) = to_clean {
self.rewrite_page(to_clean, guard)?;
}
let count = self.updates.fetch_add(1, Relaxed) + 1;
let should_snapshot =
count % self.config.snapshot_after_ops == 0;
if should_snapshot {
self.advance_snapshot()?;
}
return Ok(Ok(PagePtr { cached_pointer, ts }));
}
Err((actual_pointer, returned_new)) => {
trace!("link of pid {} failed", pid);
log_reservation.abort()?;
let actual_ts = unsafe { actual_pointer.deref().1.ts };
if actual_ts == old.ts {
new = Some(returned_new);
old = PagePtr {
cached_pointer: actual_pointer,
ts: actual_ts,
};
} else {
let returned_update = returned_new.0.clone().unwrap();
let returned_frag = returned_update.into_frag();
return Ok(Err(Some((
PagePtr {
cached_pointer: actual_pointer,
ts: actual_ts,
},
returned_frag,
))));
}
}
}
}
}
pub fn replace<'g>(
&self,
pid: PageId,
old: PagePtr<'g>,
new: Frag,
guard: &'g Guard,
) -> Result<CasResult<'g, Frag>> {
let _measure = Measure::new(&M.replace_page);
trace!("replacing pid {} with {:?}", pid, new);
#[cfg(any(test, feature = "lock_free_delays"))]
{
use std::cell::RefCell;
use std::time::{SystemTime, UNIX_EPOCH};
thread_local! {
pub static COUNT: RefCell<u32> = RefCell::new(1);
}
let time_now =
SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
#[allow(clippy::cast_possible_truncation)]
let fail_seed = std::cmp::max(3, time_now.as_nanos() as u32 % 128);
let inject_failure = COUNT.with(|c| {
let mut cr = c.borrow_mut();
*cr += 1;
*cr % fail_seed == 0
});
if inject_failure {
debug!(
"injecting a randomized failure in the replace of pid {}",
pid
);
if let Some((current_pointer, _frag, _sz)) =
self.get(pid, guard)?
{
return Ok(Err(Some((current_pointer, new))));
} else {
return Ok(Err(None));
}
}
}
let result =
self.cas_page(pid, old, Update::Compact(new), false, guard)?;
let to_clean = self.log.with_sa(|sa| sa.clean(pid));
if let Some(to_clean) = to_clean {
assert_ne!(pid, to_clean);
self.rewrite_page(to_clean, guard)?;
}
let count = self.updates.fetch_add(1, Relaxed) + 1;
let should_snapshot = count % self.config.snapshot_after_ops == 0;
if should_snapshot {
self.advance_snapshot()?;
}
Ok(result.map_err(|fail| {
let (pointer, shared) = fail.unwrap();
if let Update::Compact(rejected_new) = shared {
Some((pointer, rejected_new))
} else {
unreachable!();
}
}))
}
fn rewrite_page(&self, pid: PageId, guard: &Guard) -> Result<()> {
let _measure = Measure::new(&M.rewrite_page);
trace!("rewriting pid {}", pid);
let stack = match self.inner.get(pid) {
None => {
trace!("rewriting pid {} failed (no longer exists)", pid);
return Ok(());
}
Some(p) => p,
};
debug_delay();
let head = stack.head(guard);
let stack_iter = StackIter::from_ptr(head, guard);
let cache_entries: Vec<_> = stack_iter.collect();
if cache_entries.len() == 1 && cache_entries[0].1.pointer.is_blob() {
trace!("rewriting blob with pid {}", pid);
let blob_pointer = cache_entries[0].1.pointer.blob().1;
let log_reservation =
self.log.rewrite_blob_pointer(pid, blob_pointer)?;
let new_pointer = log_reservation.pointer();
let mut new_cache_entry = cache_entries[0].clone();
new_cache_entry.1.pointer = new_pointer;
let node = node_from_frag_vec(vec![new_cache_entry]);
debug_delay();
let result = stack.cas(head, node, guard);
if result.is_ok() {
let pointers = pointers_from_stack(head, guard);
let lsn = log_reservation.lsn();
self.log.with_sa(|sa| {
sa.mark_replace(pid, lsn, pointers, new_pointer)
})?;
let _pointer = log_reservation.complete()?;
trace!("rewriting pid {} succeeded", pid);
Ok(())
} else {
let _pointer = log_reservation.abort()?;
trace!("rewriting pid {} failed", pid);
Ok(())
}
} else {
trace!("rewriting page with pid {}", pid);
let (key, update): (_, Update) = if pid == META_PID {
let (key, meta) = self.get_meta(guard)?;
(key, Update::Meta(meta.clone()))
} else if pid == COUNTER_PID {
let (key, counter) = self.get_idgen(guard)?;
(key, Update::Counter(counter))
} else if pid == CONFIG_PID {
let (key, config) = self.get_persisted_config(guard)?;
(key, Update::Config(*config))
} else if let Some((key, frag, _sz)) = self.get(pid, guard)? {
(key, Update::Compact(frag.clone()))
} else {
let head = stack.head(guard);
let mut stack_iter = StackIter::from_ptr(head, guard);
match stack_iter.next() {
Some((Some(Update::Free), cache_info)) => (
PagePtr { cached_pointer: head, ts: cache_info.ts },
Update::Free,
),
other => {
debug!(
"when rewriting pid {} \
we encountered a rewritten \
node with a frag {:?} that \
we previously witnessed a Free \
for (PageCache::get returned None), \
assuming we can just return now since \
the Free was replace'd",
pid, other
);
return Ok(());
}
}
};
self.cas_page(pid, key, update, true, guard).map(|res| {
trace!("rewriting pid {} success: {}", pid, res.is_ok());
})
}
}
#[allow(clippy::cast_precision_loss)]
#[doc(hidden)]
pub fn space_amplification(&self) -> Result<f64> {
let on_disk_bytes = self.size_on_disk()? as f64;
let logical_size = self.logical_size_of_all_pages()? as f64;
let discount = self.config.segment_size as f64 * 8.;
Ok(on_disk_bytes / (logical_size + discount))
}
fn size_on_disk(&self) -> Result<u64> {
let mut size = self.config.file.metadata()?.len();
let stable = self.config.blob_path(0);
let blob_dir = stable.parent().unwrap();
let blob_files = std::fs::read_dir(blob_dir)?;
for blob_file in blob_files {
size += blob_file?.metadata()?.len();
}
Ok(size)
}
fn logical_size_of_all_pages(&self) -> Result<u64> {
let guard = pin();
let meta_size = self.meta(&guard)?.size_in_bytes();
let idgen_size = std::mem::size_of::<u64>() as u64;
let config_size = self.get_persisted_config(&guard)?.1.size_in_bytes();
let mut ret = meta_size + idgen_size + config_size;
let min_pid = CONFIG_PID + 1;
let next_pid_to_allocate = self.next_pid_to_allocate.load(Acquire);
for pid in min_pid..next_pid_to_allocate {
if let Some((_, _, sz)) = self.get(pid, &guard)? {
ret += sz;
}
}
Ok(ret)
}
fn cas_page<'g>(
&self,
pid: PageId,
mut old: PagePtr<'g>,
update: Update,
is_rewrite: bool,
guard: &'g Guard,
) -> Result<CasResult<'g, Update>> {
trace!(
"cas_page called on pid {} to {:?} with old ts {:?}",
pid,
update,
old.ts
);
let stack = match self.inner.get(pid) {
None => {
trace!("early-returning from cas_page, no stack found");
return Ok(Err(None));
}
Some(p) => p,
};
let log_kind = log_kind_from_update(&update);
let serialize_latency = Measure::new(&M.serialize);
let bytes = match &update {
Update::Counter(c) => serialize(&c).unwrap(),
Update::Meta(m) => serialize(&m).unwrap(),
Update::Config(c) => serialize(&c).unwrap(),
Update::Free => vec![],
other => serialize(other.as_frag()).unwrap(),
};
drop(serialize_latency);
let mut update_opt = Some(update);
loop {
let log_reservation = self.log.reserve(log_kind, pid, &bytes)?;
let lsn = log_reservation.lsn();
let new_pointer = log_reservation.pointer();
let ts = if is_rewrite { old.ts } else { old.ts + 1 };
let cache_info = CacheInfo {
ts,
lsn,
pointer: new_pointer,
log_size: log_reservation.reservation_len(),
};
let node = node_from_frag_vec(vec![(
Some(update_opt.take().unwrap()),
cache_info,
)]);
debug_delay();
let result = stack.cas(old.cached_pointer, node, guard);
match result {
Ok(cached_pointer) => {
trace!("cas_page succeeded on pid {}", pid);
let pointers =
pointers_from_stack(old.cached_pointer, guard);
self.log.with_sa(|sa| {
sa.mark_replace(pid, lsn, pointers, new_pointer)
})?;
let _pointer = log_reservation.complete()?;
return Ok(Ok(PagePtr { cached_pointer, ts }));
}
Err((actual_pointer, returned_entry)) => {
trace!("cas_page failed on pid {}", pid);
let _pointer = log_reservation.abort()?;
let returned_update =
returned_entry.into_box().inner.0.take().unwrap();
let actual_ts = unsafe { actual_pointer.deref().1.ts };
if actual_ts != old.ts || is_rewrite {
return Ok(Err(Some((
PagePtr {
cached_pointer: actual_pointer,
ts: actual_ts,
},
returned_update,
))));
}
trace!(
"retrying CAS on pid {} with same ts of {}",
pid,
old.ts
);
old =
PagePtr { cached_pointer: actual_pointer, ts: old.ts };
update_opt = Some(returned_update);
}
} } }
pub(crate) fn get_meta<'g>(
&self,
guard: &'g Guard,
) -> Result<(PagePtr<'g>, &'g Meta)> {
trace!("getting page iter for META");
let stack = match self.inner.get(META_PID) {
None => {
return Err(Error::ReportableBug(
"failed to retrieve META page \
which should always be present"
.into(),
));
}
Some(p) => p,
};
let head = stack.head(guard);
match StackIter::from_ptr(head, guard).next() {
Some((Some(Update::Meta(m)), cache_info)) => {
Ok((PagePtr { cached_pointer: head, ts: cache_info.ts }, m))
}
Some((None, cache_info)) => {
let update =
self.pull(META_PID, cache_info.lsn, cache_info.pointer)?;
let pointer =
PagePtr { cached_pointer: head, ts: cache_info.ts };
let _ =
self.cas_page(META_PID, pointer, update, false, guard)?;
self.get_meta(guard)
}
_ => Err(Error::ReportableBug(
"failed to retrieve META page \
which should always be present"
.into(),
)),
}
}
pub(crate) fn get_persisted_config<'g>(
&self,
guard: &'g Guard,
) -> Result<(PagePtr<'g>, &'g StorageParameters)> {
trace!("getting page iter for persisted config");
let stack = match self.inner.get(CONFIG_PID) {
None => {
return Err(Error::ReportableBug(
"failed to retrieve persisted config page \
which should always be present"
.into(),
));
}
Some(p) => p,
};
let head = stack.head(guard);
match StackIter::from_ptr(head, guard).next() {
Some((Some(Update::Config(config)), cache_info)) => Ok((
PagePtr { cached_pointer: head, ts: cache_info.ts },
config,
)),
Some((None, cache_info)) => {
let update =
self.pull(CONFIG_PID, cache_info.lsn, cache_info.pointer)?;
let pointer =
PagePtr { cached_pointer: head, ts: cache_info.ts };
let _ =
self.cas_page(CONFIG_PID, pointer, update, false, guard)?;
self.get_persisted_config(guard)
}
_ => Err(Error::ReportableBug(
"failed to retrieve CONFIG page \
which should always be present"
.into(),
)),
}
}
pub(crate) fn get_idgen<'g>(
&self,
guard: &'g Guard,
) -> Result<(PagePtr<'g>, u64)> {
trace!("getting page iter for idgen");
let stack = match self.inner.get(COUNTER_PID) {
None => {
return Err(Error::ReportableBug(
"failed to retrieve idgen page \
which should always be present"
.into(),
));
}
Some(p) => p,
};
let head = stack.head(guard);
match StackIter::from_ptr(head, guard).next() {
Some((Some(Update::Counter(counter)), cache_info)) => Ok((
PagePtr { cached_pointer: head, ts: cache_info.ts },
*counter,
)),
Some((None, cache_info)) => {
let update =
self.pull(COUNTER_PID, cache_info.lsn, cache_info.pointer)?;
let pointer =
PagePtr { cached_pointer: head, ts: cache_info.ts };
let _ =
self.cas_page(COUNTER_PID, pointer, update, false, guard)?;
self.get_idgen(guard)
}
_ => Err(Error::ReportableBug(
"failed to retrieve idgen page \
which should always be present"
.into(),
)),
}
}
pub fn get<'g>(
&self,
pid: PageId,
guard: &'g Guard,
) -> Result<Option<(PagePtr<'g>, &'g Frag, u64)>> {
trace!("getting page iterator for pid {}", pid);
let _measure = Measure::new(&M.get_page);
if pid == COUNTER_PID
|| pid == META_PID
|| pid == CONFIG_PID
|| pid == BATCH_MANIFEST_PID
{
return Err(Error::Unsupported(
"you are not able to iterate over \
the first couple pages, which are \
reserved for storing metadata and \
monotonic ID generator info"
.into(),
));
}
let stack = match self.inner.get(pid) {
None => return Ok(None),
Some(p) => p,
};
let head = stack.head(guard);
let entries: Vec<_> = StackIter::from_ptr(head, guard).collect();
let is_free = if let Some((Some(entry), _)) = entries.first() {
entry.is_free()
} else {
false
};
if entries.is_empty() || is_free {
return Ok(None);
}
let total_page_size = entries
.iter()
.map(|(_, cache_info)| cache_info.log_size as u64)
.sum();
let initial_base = match entries[0] {
(Some(Update::Compact(compact)), cache_info) => {
return Ok(Some((
PagePtr { cached_pointer: head, ts: cache_info.ts },
compact,
total_page_size,
)));
}
(Some(Update::Append(_)), _) => {
let base_idx = entries.iter().position(|(e, _)| {
e.is_some() && e.as_ref().unwrap().is_compact()
});
if let Some(base_idx) = base_idx {
let mut base =
entries[base_idx].0.as_ref().unwrap().as_frag().clone();
for (append, _) in entries[0..base_idx].iter().rev() {
base.merge(append.as_ref().unwrap().as_frag());
}
Some(base)
} else {
None
}
}
_ => {
None
}
};
let base = if let Some(initial_base) = initial_base {
initial_base
} else {
let pulled = entries.iter().map(|entry| match entry {
(Some(Update::Compact(compact)), _)
| (Some(Update::Append(compact)), _) => {
Ok(Cow::Borrowed(compact))
}
(None, cache_info) => {
let res = self
.pull(pid, cache_info.lsn, cache_info.pointer)
.map(|pg| pg)?;
Ok(Cow::Owned(res.into_frag()))
}
other => {
panic!("iterating over unexpected update: {:?}", other);
}
});
let mut successes: Vec<Cow<'_, Frag>> = match pulled.collect() {
Ok(success) => success,
Err(Error::Io(ref error))
if error.kind() == std::io::ErrorKind::NotFound =>
{
return Ok(None);
}
Err(error) => return Err(error),
};
let mut base = successes.pop().unwrap().into_owned();
while let Some(frag) = successes.pop() {
base.merge(&frag);
}
base
};
let mut frags: Vec<(Option<Update>, CacheInfo)> =
entries.iter().map(|(_, cache_info)| (None, *cache_info)).collect();
frags[0].0 = Some(Update::Compact(base));
let node = node_from_frag_vec(frags).into_shared(guard);
#[cfg(feature = "event_log")]
assert_eq!(
pointers_from_stack(head, guard),
pointers_from_stack(node, guard),
);
let node = unsafe { node.into_owned() };
debug_delay();
let res = stack.cas(head, node, guard);
if let Ok(new_pointer) = res {
trace!("fix-up for pid {} succeeded", pid);
let to_evict = self.lru.accessed(pid, total_page_size);
trace!("accessed pid {} -> paging out pids {:?}", pid, to_evict);
if !to_evict.is_empty() {
self.page_out(to_evict, guard)?;
}
let page_ref = unsafe {
let item = &new_pointer.deref().inner;
if let (Some(Update::Compact(compact)), _) = item {
compact
} else {
panic!()
}
};
let pointer =
PagePtr { cached_pointer: new_pointer, ts: entries[0].1.ts };
Ok(Some((pointer, page_ref, total_page_size)))
} else {
trace!("fix-up for pid {} failed", pid);
self.get(pid, guard)
}
}
pub fn stable_lsn(&self) -> Lsn {
self.log.stable_offset()
}
pub fn make_stable(&self, lsn: Lsn) -> Result<usize> {
self.log.make_stable(lsn)
}
pub const fn was_recovered(&self) -> bool {
self.was_recovered
}
pub fn generate_id(&self) -> Result<u64> {
let ret = self.idgen.fetch_add(1, Relaxed);
let interval = self.config.idgen_persist_interval;
let necessary_persists = ret / interval * interval;
let mut persisted = self.idgen_persists.load(Acquire);
while persisted < necessary_persists {
let _mu = self.idgen_persist_mu.lock();
persisted = self.idgen_persists.load(Acquire);
if persisted < necessary_persists {
let guard = pin();
let (key, current) = self.get_idgen(&guard)?;
assert_eq!(current, persisted);
let counter_update = Update::Counter(necessary_persists);
let old = self.idgen_persists.swap(necessary_persists, Release);
assert_eq!(old, persisted);
if self
.cas_page(
COUNTER_PID,
key.clone(),
counter_update,
false,
&guard,
)?
.is_err()
{
continue;
}
let gap = (necessary_persists - persisted) / interval;
if gap > 1 {
let _written = self.flush()?;
} else if key.last_lsn() > self.stable_lsn() {
let _written = self.make_stable(key.last_lsn())?;
}
}
}
Ok(ret)
}
pub fn meta<'a>(&self, guard: &'a Guard) -> Result<&'a Meta> {
self.get_meta(guard).map(|(_k, m)| m)
}
pub fn meta_pid_for_name(
&self,
name: &[u8],
guard: &Guard,
) -> Result<PageId> {
let m = self.meta(guard)?;
if let Some(root) = m.get_root(name) {
Ok(root)
} else {
Err(Error::CollectionNotFound(name.to_vec()))
}
}
pub fn cas_root_in_meta<'g>(
&self,
name: &[u8],
old: Option<PageId>,
new: Option<PageId>,
guard: &'g Guard,
) -> Result<std::result::Result<(), Option<PageId>>> {
loop {
let (meta_key, meta) = self.get_meta(guard)?;
let actual = meta.get_root(name);
if actual != old {
return Ok(Err(actual));
}
let mut new_meta = (*meta).clone();
if let Some(new) = new {
new_meta.set_root(name.to_vec(), new);
} else {
new_meta.del_root(name);
}
let new_meta_frag = Update::Meta(new_meta);
let res = self.cas_page(
META_PID,
meta_key.clone(),
new_meta_frag,
false,
guard,
)?;
match res {
Ok(_worked) => return Ok(Ok(())),
Err(Some((_current_pointer, _rejected))) => {}
Err(None) => {
return Err(Error::ReportableBug(
"replacing the META page has failed because \
the pagecache does not think it currently exists."
.into(),
));
}
}
}
}
fn page_out(&self, to_evict: Vec<PageId>, guard: &Guard) -> Result<()> {
let _measure = Measure::new(&M.page_out);
'different_page_eviction: for pid in to_evict {
if pid == COUNTER_PID
|| pid == META_PID
|| pid == CONFIG_PID
|| pid == BATCH_MANIFEST_PID
{
continue;
}
let stack = match self.inner.get(pid) {
None => continue 'different_page_eviction,
Some(p) => p,
};
debug_delay();
let head = stack.head(guard);
let stack_iter = StackIter::from_ptr(head, guard);
let stack_len = stack_iter.size_hint().1.unwrap();
let mut new_stack = Vec::with_capacity(stack_len);
for (update_opt, cache_info) in stack_iter {
match update_opt {
None | Some(Update::Free) => {
continue 'different_page_eviction;
}
Some(_) => {
new_stack.push((None, *cache_info));
}
}
}
let node = node_from_frag_vec(new_stack);
debug_delay();
let result = stack.cas(head, node, guard);
if result.is_ok() {
} else {
trace!("failed to page-out pid {}", pid)
}
}
Ok(())
}
fn pull(&self, pid: PageId, lsn: Lsn, pointer: DiskPtr) -> Result<Update> {
use MessageKind::*;
trace!("pulling lsn {} pointer {} from disk", lsn, pointer);
let _measure = Measure::new(&M.pull);
let (header, bytes) = match self.log.read(pid, lsn, pointer) {
Ok(LogRead::Inline(header, buf, _len)) => {
assert_eq!(
header.pid, pid,
"expected pid {} on pull of pointer {}, \
but got {} instead",
pid, pointer, header.pid
);
assert_eq!(
header.lsn, lsn,
"expected lsn {} on pull of pointer {}, \
but got lsn {} instead",
lsn, pointer, header.lsn
);
Ok((header, buf))
}
Ok(LogRead::Blob(header, buf, _blob_pointer)) => {
assert_eq!(
header.pid, pid,
"expected pid {} on pull of pointer {}, \
but got {} instead",
pid, pointer, header.pid
);
assert_eq!(
header.lsn, lsn,
"expected lsn {} on pull of pointer {}, \
but got lsn {} instead",
lsn, pointer, header.lsn
);
Ok((header, buf))
}
Ok(other) => {
debug!("read unexpected page: {:?}", other);
Err(Error::Corruption { at: pointer })
}
Err(e) => {
debug!("failed to read page: {:?}", e);
Err(e)
}
}?;
let deserialize_latency = Measure::new(&M.deserialize);
let update_res = match header.kind {
Counter => deserialize::<u64>(&bytes).map(Update::Counter),
BlobMeta | InlineMeta => {
deserialize::<Meta>(&bytes).map(Update::Meta)
}
BlobConfig | InlineConfig => {
deserialize::<StorageParameters>(&bytes).map(Update::Config)
}
BlobAppend | InlineAppend => {
deserialize::<Frag>(&bytes).map(Update::Append)
}
BlobReplace | InlineReplace => {
deserialize::<Frag>(&bytes).map(Update::Compact)
}
Free => Ok(Update::Free),
Corrupted | Cancelled | Pad | BatchManifest => {
panic!("unexpected pull: {:?}", header.kind)
}
};
drop(deserialize_latency);
let update =
update_res.map_err(|_| ()).expect("failed to deserialize data");
if let Update::Free = update {
Err(Error::ReportableBug(
"non-append/compact found in pull".to_owned(),
))
} else {
Ok(update)
}
}
fn advance_snapshot(&self) -> Result<()> {
let snapshot_mu = self.last_snapshot.clone();
let config = self.config.clone();
let iobufs = self.log.iobufs.clone();
let gen_snapshot = move || {
let snapshot_opt_res = snapshot_mu.try_lock();
if snapshot_opt_res.is_none() {
debug!(
"snapshot skipped because previous attempt \
appears not to have completed"
);
return Ok(());
}
let mut snapshot_opt = snapshot_opt_res.unwrap();
let last_snapshot = snapshot_opt
.take()
.expect("PageCache::advance_snapshot called before recovery");
if let Err(e) = iobuf::flush(&iobufs) {
error!("failed to flush log during advance_snapshot: {}", e);
iobufs.with_sa(SegmentAccountant::resume_rewriting);
*snapshot_opt = Some(last_snapshot);
return Err(e);
}
iobufs.with_sa(SegmentAccountant::pause_rewriting);
let last_lsn = last_snapshot.last_lsn;
let start_lsn = last_lsn - (last_lsn % config.segment_size as Lsn);
let iter = iobufs.iter_from(start_lsn);
debug!(
"snapshot starting from offset {} to the segment containing ~{}",
last_snapshot.last_lsn,
iobufs.stable(),
);
let res = advance_snapshot(iter, last_snapshot, &config);
iobufs.with_sa(SegmentAccountant::resume_rewriting);
match res {
Err(e) => {
*snapshot_opt = Some(Snapshot::default());
error!("failed to generate snapshot: {:?}", e);
Err(e)
}
Ok(next_snapshot) => {
*snapshot_opt = Some(next_snapshot);
Ok(())
}
}
};
if let Err(e) = self.config.global_error() {
let _notified = self.log.iobufs.interval_updated.notify_all();
return Err(e);
}
debug!("asynchronously spawning snapshot generation task");
let config = self.config.clone();
let _result = threadpool::spawn(move || {
let result = gen_snapshot();
match &result {
Ok(_) => {}
Err(Error::Io(ref ioe))
if ioe.kind() == std::io::ErrorKind::NotFound => {}
Err(error) => {
error!(
"encountered error while generating snapshot: {:?}",
error,
);
config.set_global_error(error.clone());
}
}
result
});
#[cfg(test)]
_result.unwrap()?;
Ok(())
}
fn load_snapshot(&mut self) {
let snapshot_mu = self.last_snapshot.try_lock().unwrap();
let snapshot = snapshot_mu.as_ref().unwrap();
let next_pid_to_allocate = if snapshot.pt.is_empty() {
0
} else {
*snapshot.pt.keys().max().unwrap() + 1
};
self.next_pid_to_allocate = AtomicU64::from(next_pid_to_allocate);
debug!("load_snapshot loading pages from 0..{}", next_pid_to_allocate);
for pid in 0..next_pid_to_allocate {
let state = if let Some(state) = snapshot.pt.get(&pid) {
state
} else {
panic!(
"load_snapshot pid {} not found, despite being below the max pid {}",
pid, next_pid_to_allocate
);
};
trace!("load_snapshot pid {} {:?}", pid, state);
let stack = Stack::default();
let guard = pin();
match *state {
PageState::Present(ref pointers) => {
for &(lsn, pointer, sz) in pointers {
let cache_info =
CacheInfo { lsn, pointer, log_size: sz, ts: 0 };
stack.push((None, cache_info), &guard);
}
}
PageState::Free(lsn, pointer) => {
trace!("load_snapshot freeing pid {}", pid);
let cache_info = CacheInfo {
lsn,
pointer,
log_size: MSG_HEADER_LEN,
ts: 0,
};
stack.push((Some(Update::Free), cache_info), &guard);
self.free.lock().push(pid);
}
}
trace!("installing stack for pid {}", pid);
self.inner.insert(pid, stack);
}
}
}
fn pointers_from_stack<'g>(
head_pointer: PagePtrInner<'g>,
guard: &'g Guard,
) -> Vec<DiskPtr> {
let stack_iter = StackIter::from_ptr(head_pointer, guard);
let mut pointers = vec![];
for (_, cache_info) in stack_iter {
pointers.push(cache_info.pointer);
}
pointers
}