#![allow(unsafe_code)]
pub mod constants;
pub mod logger;
mod blob_io;
mod disk_pointer;
mod header;
mod iobuf;
mod iterator;
mod pagetable;
#[cfg(any(all(not(unix), not(windows)), miri))]
mod parallel_io_polyfill;
#[cfg(all(unix, not(miri)))]
mod parallel_io_unix;
#[cfg(all(windows, not(miri)))]
mod parallel_io_windows;
mod reservation;
mod segment;
mod snapshot;
use std::{collections::BinaryHeap, ops::Deref};
use crate::*;
#[cfg(any(all(not(unix), not(windows)), miri))]
use parallel_io_polyfill::{pread_exact, pread_exact_or_eof, pwrite_all};
#[cfg(all(unix, not(miri)))]
use parallel_io_unix::{pread_exact, pread_exact_or_eof, pwrite_all};
#[cfg(all(windows, not(miri)))]
use parallel_io_windows::{pread_exact, pread_exact_or_eof, pwrite_all};
use self::{
blob_io::{gc_blobs, read_blob, remove_blob, write_blob},
constants::{
BATCH_MANIFEST_PID, COUNTER_PID, META_PID,
PAGE_CONSOLIDATION_THRESHOLD, SEGMENT_CLEANUP_THRESHOLD,
},
header::Header,
iobuf::{roll_iobuf, IoBuf, IoBufs},
iterator::{raw_segment_iter_from, LogIter},
pagetable::PageTable,
segment::{SegmentAccountant, SegmentCleaner, SegmentOp},
};
pub(crate) use self::{
logger::{
read_message, read_segment_header, MessageHeader, SegmentHeader,
SegmentNumber,
},
reservation::Reservation,
snapshot::{read_snapshot_or_default, PageState, Snapshot},
};
pub use self::{
constants::{
MAX_MSG_HEADER_LEN, MAX_SPACE_AMPLIFICATION, MINIMUM_ITEMS_PER_SEGMENT,
SEG_HEADER_LEN,
},
disk_pointer::DiskPtr,
logger::{Log, LogRead},
};
pub type SegmentId = usize;
pub type LogOffset = u64;
pub type BlobPointer = Lsn;
pub type Lsn = i64;
pub type PageId = u64;
#[derive(Default, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Debug)]
#[repr(transparent)]
pub struct BatchManifest(pub Lsn);
#[derive(Debug)]
pub struct BasedBuf {
pub buf: Vec<u8>,
pub offset: LogOffset,
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
#[repr(u8)]
pub enum MessageKind {
Corrupted = 0,
Canceled = 1,
Cap = 2,
BatchManifest = 3,
Free = 4,
Counter = 5,
InlineMeta = 6,
BlobMeta = 7,
InlineNode = 8,
BlobNode = 9,
InlineLink = 10,
BlobLink = 11,
}
impl MessageKind {
pub(crate) const fn into(self) -> u8 {
self as u8
}
}
impl From<u8> for MessageKind {
fn from(byte: u8) -> Self {
use MessageKind::*;
match byte {
0 => Corrupted,
1 => Canceled,
2 => Cap,
3 => BatchManifest,
4 => Free,
5 => Counter,
6 => InlineMeta,
7 => BlobMeta,
8 => InlineNode,
9 => BlobNode,
10 => InlineLink,
11 => BlobLink,
other => {
debug!("encountered unexpected message kind byte {}", other);
Corrupted
}
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum LogKind {
Replace,
Link,
Free,
Skip,
Corrupted,
}
fn log_kind_from_update(update: &Update) -> LogKind {
match update {
Update::Free => LogKind::Free,
Update::Link(..) => LogKind::Link,
Update::Node(..) | Update::Counter(..) | Update::Meta(..) => {
LogKind::Replace
}
}
}
impl From<MessageKind> for LogKind {
fn from(kind: MessageKind) -> Self {
match kind {
MessageKind::Free => LogKind::Free,
MessageKind::InlineNode
| MessageKind::Counter
| MessageKind::BlobNode
| MessageKind::InlineMeta
| MessageKind::BlobMeta => LogKind::Replace,
MessageKind::InlineLink | MessageKind::BlobLink => LogKind::Link,
MessageKind::Canceled
| MessageKind::Cap
| MessageKind::BatchManifest => LogKind::Skip,
other => {
debug!("encountered unexpected message kind byte {:?}", other);
LogKind::Corrupted
}
}
}
}
fn assert_usize<T>(from: T) -> usize
where
usize: TryFrom<T, Error = std::num::TryFromIntError>,
{
usize::try_from(from).expect("lost data cast while converting to usize")
}
fn bump_atomic_lsn(atomic_lsn: &AtomicLsn, to: Lsn) {
let mut current = atomic_lsn.load(Acquire);
loop {
if current >= to {
return;
}
let last = atomic_lsn.compare_and_swap(current, to, SeqCst);
if last == current {
return;
}
current = last;
}
}
use std::convert::{TryFrom, TryInto};
#[inline]
pub(crate) fn lsn_to_arr(number: Lsn) -> [u8; 8] {
number.to_le_bytes()
}
#[inline]
pub(crate) fn arr_to_lsn(arr: &[u8]) -> Lsn {
Lsn::from_le_bytes(arr.try_into().unwrap())
}
#[inline]
pub(crate) fn u64_to_arr(number: u64) -> [u8; 8] {
number.to_le_bytes()
}
#[inline]
pub(crate) fn arr_to_u32(arr: &[u8]) -> u32 {
u32::from_le_bytes(arr.try_into().unwrap())
}
#[inline]
pub(crate) fn u32_to_arr(number: u32) -> [u8; 4] {
number.to_le_bytes()
}
#[allow(clippy::needless_pass_by_value)]
pub(crate) fn maybe_decompress(in_buf: Vec<u8>) -> std::io::Result<Vec<u8>> {
#[cfg(feature = "compression")]
{
use zstd::stream::decode_all;
let scootable_in_buf = &mut &*in_buf;
let _ivec_varint = u64::deserialize(scootable_in_buf)
.expect("this had to be serialized with an extra length frame");
let _measure = Measure::new(&M.decompress);
let out_buf = decode_all(scootable_in_buf).expect(
"failed to decompress data. \
This is not expected, please open an issue on \
https://github.com/spacejam/sled so we can \
fix this critical issue ASAP. Thank you :)",
);
Ok(out_buf)
}
#[cfg(not(feature = "compression"))]
Ok(in_buf)
}
#[derive(Debug, Clone, Copy)]
pub struct NodeView<'g>(pub(crate) PageView<'g>);
impl<'g> Deref for NodeView<'g> {
type Target = Node;
fn deref(&self) -> &Node {
self.0.as_node()
}
}
unsafe impl<'g> Send for NodeView<'g> {}
unsafe impl<'g> Sync for NodeView<'g> {}
#[derive(Debug, Clone, Copy)]
pub struct MetaView<'g>(PageView<'g>);
impl<'g> Deref for MetaView<'g> {
type Target = Meta;
fn deref(&self) -> &Meta {
self.0.as_meta()
}
}
unsafe impl<'g> Send for MetaView<'g> {}
unsafe impl<'g> Sync for MetaView<'g> {}
#[derive(Debug, Clone, Copy)]
pub struct PageView<'g> {
pub(crate) read: Shared<'g, Page>,
pub(crate) entry: &'g Atomic<Page>,
}
unsafe impl<'g> Send for PageView<'g> {}
unsafe impl<'g> Sync for PageView<'g> {}
impl<'g> Deref for PageView<'g> {
type Target = Page;
fn deref(&self) -> &Page {
unsafe { self.read.deref() }
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct CacheInfo {
pub ts: u64,
pub lsn: Lsn,
pub pointer: DiskPtr,
pub log_size: u64,
}
#[cfg(test)]
impl quickcheck::Arbitrary for CacheInfo {
fn arbitrary<G: quickcheck::Gen>(g: &mut G) -> CacheInfo {
use rand::Rng;
CacheInfo {
ts: g.gen(),
lsn: g.gen(),
pointer: DiskPtr::arbitrary(g),
log_size: g.gen(),
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub(crate) enum Update {
Link(Link),
Node(Node),
Free,
Counter(u64),
Meta(Meta),
}
impl Update {
fn as_node(&self) -> &Node {
match self {
Update::Node(node) => node,
other => panic!("called as_node on non-Node: {:?}", other),
}
}
fn as_node_mut(&mut self) -> &mut Node {
match self {
Update::Node(node) => node,
other => panic!("called as_node_mut on non-Node: {:?}", other),
}
}
fn as_link(&self) -> &Link {
match self {
Update::Link(link) => link,
other => panic!("called as_link on non-Link: {:?}", other),
}
}
pub(crate) fn as_meta(&self) -> &Meta {
if let Update::Meta(meta) = self {
meta
} else {
panic!("called as_meta on {:?}", self)
}
}
pub(crate) fn as_counter(&self) -> u64 {
if let Update::Counter(counter) = self {
*counter
} else {
panic!("called as_counter on {:?}", self)
}
}
fn is_free(&self) -> bool {
if let Update::Free = self {
true
} else {
false
}
}
}
#[derive(Debug)]
pub struct RecoveryGuard<'a> {
batch_res: Reservation<'a>,
}
impl<'a> RecoveryGuard<'a> {
pub(crate) fn seal_batch(self) -> Result<()> {
let max_reserved =
self.batch_res.log.iobufs.max_reserved_lsn.load(Acquire);
self.batch_res.mark_writebatch(max_reserved).map(|_| ())
}
}
#[derive(Debug, Clone)]
pub struct Page {
pub(crate) update: Option<Box<Update>>,
pub(crate) cache_infos: Vec<CacheInfo>,
}
impl Page {
pub(crate) fn to_page_state(&self) -> PageState {
let base = &self.cache_infos[0];
if self.is_free() {
PageState::Free(base.lsn, base.pointer)
} else {
let mut frags: Vec<(Lsn, DiskPtr, u64)> = vec![];
for cache_info in self.cache_infos.iter().skip(1) {
frags.push((
cache_info.lsn,
cache_info.pointer,
cache_info.log_size,
));
}
PageState::Present {
base: (base.lsn, base.pointer, base.log_size),
frags,
}
}
}
pub(crate) fn as_node(&self) -> &Node {
self.update.as_ref().unwrap().as_node()
}
pub(crate) fn as_meta(&self) -> &Meta {
self.update.as_ref().unwrap().as_meta()
}
pub(crate) fn as_counter(&self) -> u64 {
self.update.as_ref().unwrap().as_counter()
}
pub(crate) fn is_free(&self) -> bool {
self.update.as_ref().map_or(false, |u| u.is_free())
|| self.cache_infos.is_empty()
}
pub(crate) fn last_lsn(&self) -> Lsn {
self.cache_infos.last().map(|ci| ci.lsn).unwrap()
}
pub(crate) fn log_size(&self) -> u64 {
self.cache_infos.iter().map(|ci| ci.log_size).sum()
}
fn ts(&self) -> u64 {
self.cache_infos.last().map_or(0, |ci| ci.ts)
}
fn lone_blob(&self) -> Option<DiskPtr> {
if self.cache_infos.len() == 1 && self.cache_infos[0].pointer.is_blob()
{
Some(self.cache_infos[0].pointer)
} else {
None
}
}
}
pub struct PageCache {
pub(crate) config: RunningConfig,
inner: PageTable,
next_pid_to_allocate: Mutex<PageId>,
free: Arc<Mutex<BinaryHeap<PageId>>>,
#[doc(hidden)]
pub log: Log,
lru: Lru,
idgen: Arc<AtomicU64>,
idgen_persists: Arc<AtomicU64>,
idgen_persist_mu: Arc<Mutex<()>>,
was_recovered: bool,
}
unsafe impl Send for PageCache {}
unsafe impl Sync for PageCache {}
impl Debug for PageCache {
fn fmt(
&self,
f: &mut fmt::Formatter<'_>,
) -> std::result::Result<(), fmt::Error> {
f.write_str(&*format!(
"PageCache {{ max: {:?} free: {:?} }}\n",
*self.next_pid_to_allocate.lock(),
self.free
))
}
}
#[cfg(feature = "event_log")]
impl Drop for PageCache {
fn drop(&mut self) {
use std::collections::HashMap;
trace!("dropping pagecache");
if self.log.iobufs.config.global_error().is_ok() {
let mut pages_before_restart = HashMap::new();
let guard = pin();
self.config.event_log.meta_before_restart(
self.get_meta(&guard)
.expect("should get meta under test")
.deref()
.clone(),
);
for pid in 0..*self.next_pid_to_allocate.lock() {
let pte = if let Some(pte) = self.inner.get(pid, &guard) {
pte
} else {
continue;
};
let pointers =
pte.cache_infos.iter().map(|ci| ci.pointer).collect();
pages_before_restart.insert(pid, pointers);
}
self.config.event_log.pages_before_restart(pages_before_restart);
}
trace!("pagecache dropped");
}
}
impl PageCache {
pub(crate) fn start(config: RunningConfig) -> Result<Self> {
trace!("starting pagecache");
config.reset_global_error();
let snapshot = read_snapshot_or_default(&config)?;
#[cfg(feature = "testing")]
{
trace!(
"\n\n~~~~ regenerating snapshot for idempotency test ~~~~\n"
);
let snapshot2 = read_snapshot_or_default(&config)
.expect("second read snapshot");
assert_eq!(
snapshot.active_segment, snapshot2.active_segment,
"snapshot active_segment diverged across recoveries.\n\n \
first: {:?}\n\n
second: {:?}\n\n",
snapshot, snapshot2
);
assert_eq!(
snapshot.stable_lsn, snapshot2.stable_lsn,
"snapshot stable_lsn diverged across recoveries.\n\n \
first: {:?}\n\n
second: {:?}\n\n",
snapshot, snapshot2
);
for (pid, (p1, p2)) in
snapshot.pt.iter().zip(snapshot2.pt.iter()).enumerate()
{
assert_eq!(
p1, p2,
"snapshot pid {} diverged across recoveries.\n\n \
first: {:?}\n\n
second: {:?}\n\n",
pid, p1, p2
);
}
assert_eq!(
snapshot.pt.len(),
snapshot2.pt.len(),
"snapshots number of pages diverged across recoveries.\n\n \
first: {:?}\n\n
second: {:?}\n\n",
snapshot.pt,
snapshot2.pt
);
assert_eq!(
snapshot, snapshot2,
"snapshots diverged across recoveries.\n\n \
first: {:?}\n\n
second: {:?}\n\n",
snapshot, snapshot2
);
}
let _measure = Measure::new(&M.start_pagecache);
let cache_capacity = config.cache_capacity;
let lru = Lru::new(cache_capacity);
let mut pc = Self {
config: config.clone(),
inner: PageTable::default(),
next_pid_to_allocate: Mutex::new(0),
free: Arc::new(Mutex::new(BinaryHeap::new())),
log: Log::start(config, &snapshot)?,
lru,
idgen_persist_mu: Arc::new(Mutex::new(())),
idgen: Arc::new(AtomicU64::new(0)),
idgen_persists: Arc::new(AtomicU64::new(0)),
was_recovered: false,
};
pc.load_snapshot(&snapshot)?;
#[cfg(feature = "testing")]
{
use std::collections::HashMap;
let guard = pin();
let mut pages_after_restart = HashMap::new();
for pid in 0..*pc.next_pid_to_allocate.lock() {
let pte = if let Some(pte) = pc.inner.get(pid, &guard) {
pte
} else {
continue;
};
let pointers =
pte.cache_infos.iter().map(|ci| ci.pointer).collect();
pages_after_restart.insert(pid, pointers);
}
pc.config.event_log.pages_after_restart(pages_after_restart);
}
let mut was_recovered = true;
{
let guard = pin();
if let Err(Error::ReportableBug(..)) = pc.get_meta(&guard) {
was_recovered = false;
let meta_update = Update::Meta(Meta::default());
let (meta_id, _) = pc.allocate_inner(meta_update, &guard)?;
assert_eq!(
meta_id, META_PID,
"we expect the meta page to have pid {}, but it had pid {} instead",
META_PID, meta_id,
);
}
if let Err(Error::ReportableBug(..)) = pc.get_idgen(&guard) {
was_recovered = false;
let counter_update = Update::Counter(0);
let (counter_id, _) =
pc.allocate_inner(counter_update, &guard)?;
assert_eq!(
counter_id, COUNTER_PID,
"we expect the counter to have pid {}, but it had pid {} instead",
COUNTER_PID, counter_id,
);
}
let (_, counter) = pc.get_idgen(&guard)?;
let idgen_recovery = if was_recovered {
counter + (2 * pc.config.idgen_persist_interval)
} else {
0
};
let idgen_persists = counter / pc.config.idgen_persist_interval
* pc.config.idgen_persist_interval;
pc.idgen.store(idgen_recovery, Release);
pc.idgen_persists.store(idgen_persists, Release);
}
pc.was_recovered = was_recovered;
#[cfg(feature = "event_log")]
{
let guard = pin();
pc.config.event_log.meta_after_restart(
pc.get_meta(&guard)
.expect("should be able to get meta under test")
.deref()
.clone(),
);
}
trace!("pagecache started");
Ok(pc)
}
pub(crate) fn flush(&self) -> Result<usize> {
self.log.flush()
}
pub(crate) fn allocate<'g>(
&self,
new: Node,
guard: &'g Guard,
) -> Result<(PageId, PageView<'g>)> {
self.allocate_inner(Update::Node(new), guard)
}
fn allocate_inner<'g>(
&self,
new: Update,
guard: &'g Guard,
) -> Result<(PageId, PageView<'g>)> {
let mut allocation_serializer;
let free_opt = self.free.lock().pop();
let (pid, page_view) = if let Some(pid) = free_opt {
trace!("re-allocating pid {}", pid);
let page_view = match self.inner.get(pid, guard) {
None => panic!(
"expected to find existing stack \
for re-allocated pid {}",
pid
),
Some(p) => p,
};
assert!(
page_view.is_free(),
"failed to re-allocate pid {} which \
contained unexpected state {:?}",
pid,
page_view,
);
(pid, page_view)
} else {
allocation_serializer = self.next_pid_to_allocate.lock();
let pid = *allocation_serializer;
*allocation_serializer += 1;
trace!("allocating pid {} for the first time", pid);
let new_page = Page { update: None, cache_infos: Vec::default() };
let page_view = self.inner.insert(pid, new_page, guard);
(pid, page_view)
};
let new_pointer = self
.cas_page(pid, page_view, new, false, guard)?
.unwrap_or_else(|e| {
panic!(
"should always be able to install \
a new page during allocation, but \
failed for pid {}: {:?}",
pid, e
)
});
Ok((pid, new_pointer))
}
#[cfg(all(
not(miri),
any(
windows,
target_os = "linux",
target_os = "macos",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "openbsd",
target_os = "netbsd",
)
))]
pub(crate) fn attempt_gc(&self) -> Result<bool> {
let guard = pin();
let cc = concurrency_control::read();
let to_clean = self.log.iobufs.segment_cleaner.pop();
let ret = if let Some((pid_to_clean, segment_to_clean)) = to_clean {
self.rewrite_page(pid_to_clean, segment_to_clean, &guard)
.map(|_| true)
} else {
Ok(false)
};
drop(cc);
guard.flush();
ret
}
pub(crate) fn pin_log(&self, guard: &Guard) -> Result<RecoveryGuard<'_>> {
self.log.roll_iobuf()?;
let batch_res = self.log.reserve(
LogKind::Skip,
BATCH_MANIFEST_PID,
&BatchManifest::default(),
guard,
)?;
iobuf::maybe_seal_and_write_iobuf(
&self.log.iobufs,
&batch_res.iobuf,
batch_res.iobuf.get_header(),
false,
)?;
Ok(RecoveryGuard { batch_res })
}
#[doc(hidden)]
#[cfg(feature = "failpoints")]
#[cfg(all(
not(miri),
any(
windows,
target_os = "linux",
target_os = "macos",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "openbsd",
target_os = "netbsd",
)
))]
pub(crate) fn set_failpoint(&self, e: Error) {
if let Error::FailPoint = e {
self.config.set_global_error(e);
let intervals = self.log.iobufs.intervals.lock();
drop(intervals);
let _notified = self.log.iobufs.interval_updated.notify_all();
}
}
pub(crate) fn free<'g>(
&self,
pid: PageId,
old: PageView<'g>,
guard: &'g Guard,
) -> Result<CasResult<'g, ()>> {
trace!("attempting to free pid {}", pid);
if pid == COUNTER_PID || pid == META_PID || pid == BATCH_MANIFEST_PID {
return Err(Error::Unsupported(
"you are not able to free the first \
couple pages, which are allocated \
for system internal purposes"
.into(),
));
}
let new_pointer =
self.cas_page(pid, old, Update::Free, false, guard)?;
if new_pointer.is_ok() {
let free = self.free.clone();
guard.defer(move || {
let mut free = free.lock();
if free.iter().any(|e| e == &pid) {
panic!("pid {} was double-freed", pid);
}
free.push(pid);
});
}
Ok(new_pointer.map_err(|o| o.map(|(pointer, _)| (pointer, ()))))
}
pub(crate) fn link<'g>(
&'g self,
pid: PageId,
mut old: PageView<'g>,
new: Link,
guard: &'g Guard,
) -> Result<CasResult<'g, Link>> {
let _measure = Measure::new(&M.link_page);
trace!("linking pid {} with {:?}", pid, new);
#[cfg(any(test, feature = "lock_free_delays"))]
{
use std::cell::RefCell;
use std::time::{SystemTime, UNIX_EPOCH};
thread_local! {
pub static COUNT: RefCell<u32> = RefCell::new(1);
}
let time_now =
SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
#[allow(clippy::cast_possible_truncation)]
let fail_seed = std::cmp::max(3, time_now.as_nanos() as u32 % 128);
let inject_failure = COUNT.with(|c| {
let mut cr = c.borrow_mut();
*cr += 1;
*cr % fail_seed == 0
});
if inject_failure {
debug!(
"injecting a randomized failure in the link of pid {}",
pid
);
if let Some(current_pointer) = self.get(pid, guard)? {
return Ok(Err(Some((current_pointer.0, new))));
} else {
return Ok(Err(None));
}
}
}
let mut node: Node = old.as_node().clone();
node.apply(&new);
if old.cache_infos.len() >= PAGE_CONSOLIDATION_THRESHOLD {
let short_circuit = self.replace(pid, old, node, guard)?;
return Ok(short_circuit.map_err(|a| a.map(|b| (b.0, new))));
}
let mut new_page = Some(Owned::new(Page {
update: Some(Box::new(Update::Node(node))),
cache_infos: Vec::default(),
}));
loop {
let log_reservation =
self.log.reserve(LogKind::Link, pid, &new, guard)?;
let lsn = log_reservation.lsn();
let pointer = log_reservation.pointer();
let ts = old.ts() + 1;
let cache_info = CacheInfo {
lsn,
pointer,
ts,
log_size: log_reservation.reservation_len() as u64,
};
let mut new_cache_infos =
Vec::with_capacity(old.cache_infos.len() + 1);
new_cache_infos.extend_from_slice(&old.cache_infos);
new_cache_infos.push(cache_info);
let mut page_ptr = new_page.take().unwrap();
page_ptr.cache_infos = new_cache_infos;
debug_delay();
let result =
old.entry.compare_and_set(old.read, page_ptr, SeqCst, guard);
match result {
Ok(new_shared) => {
trace!("link of pid {} succeeded", pid);
unsafe {
guard.defer_destroy(old.read);
}
assert_ne!(old.last_lsn(), 0);
self.log.iobufs.sa_mark_link(pid, cache_info, guard);
log_reservation.complete()?;
let total_page_size =
unsafe { new_shared.deref().log_size() };
let to_evict =
self.lru.accessed(pid, total_page_size, guard);
trace!(
"accessed pid {} -> paging out pids {:?}",
pid,
to_evict
);
if !to_evict.is_empty() {
self.page_out(to_evict, guard)?;
}
old.read = new_shared;
return Ok(Ok(old));
}
Err(cas_error) => {
log_reservation.abort()?;
let actual = cas_error.current;
let actual_ts = unsafe { actual.deref().ts() };
if actual_ts == old.ts() {
trace!(
"link of pid {} failed due to movement, retrying",
pid
);
new_page = Some(cas_error.new);
old.read = actual;
} else {
trace!("link of pid {} failed due to new update", pid);
let mut page_view = old;
page_view.read = actual;
return Ok(Err(Some((page_view, new))));
}
}
}
}
}
pub(crate) fn replace<'g>(
&self,
pid: PageId,
old: PageView<'g>,
new: Node,
guard: &'g Guard,
) -> Result<CasResult<'g, Node>> {
let _measure = Measure::new(&M.replace_page);
trace!("replacing pid {} with {:?}", pid, new);
#[cfg(any(test, feature = "lock_free_delays"))]
{
use std::cell::RefCell;
use std::time::{SystemTime, UNIX_EPOCH};
thread_local! {
pub static COUNT: RefCell<u32> = RefCell::new(1);
}
let time_now =
SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
#[allow(clippy::cast_possible_truncation)]
let fail_seed = std::cmp::max(3, time_now.as_nanos() as u32 % 128);
let inject_failure = COUNT.with(|c| {
let mut cr = c.borrow_mut();
*cr += 1;
*cr % fail_seed == 0
});
if inject_failure {
debug!(
"injecting a randomized failure in the replace of pid {}",
pid
);
if let Some(current_pointer) = self.get(pid, guard)? {
return Ok(Err(Some((current_pointer.0, new))));
} else {
return Ok(Err(None));
}
}
}
let result =
self.cas_page(pid, old, Update::Node(new), false, guard)?;
if let Some((pid_to_clean, segment_to_clean)) =
self.log.iobufs.segment_cleaner.pop()
{
self.rewrite_page(pid_to_clean, segment_to_clean, guard)?;
}
Ok(result.map_err(|fail| {
let (pointer, shared) = fail.unwrap();
if let Update::Node(rejected_new) = shared {
Some((pointer, rejected_new))
} else {
unreachable!();
}
}))
}
fn rewrite_page(
&self,
pid: PageId,
segment_to_purge: LogOffset,
guard: &Guard,
) -> Result<()> {
let _measure = Measure::new(&M.rewrite_page);
trace!("rewriting pid {}", pid);
let purge_segment_id =
segment_to_purge / self.config.segment_size as u64;
loop {
let page_view = if let Some(page_view) = self.inner.get(pid, guard)
{
page_view
} else {
panic!("rewriting pid {} failed (no longer exists)", pid);
};
let already_moved = !unsafe { page_view.read.deref() }
.cache_infos
.iter()
.any(|ce| {
ce.pointer.lid() / self.config.segment_size as u64
== purge_segment_id
});
if already_moved {
return Ok(());
}
if let Some(disk_pointer) = page_view.lone_blob() {
trace!("rewriting blob with pid {}", pid);
let blob_pointer = disk_pointer.blob().1;
let log_reservation =
self.log.rewrite_blob_pointer(pid, blob_pointer, guard)?;
let cache_info = CacheInfo {
ts: page_view.ts(),
lsn: log_reservation.lsn,
pointer: log_reservation.pointer,
log_size: u64::try_from(log_reservation.reservation_len())
.unwrap(),
};
let new_page = Owned::new(Page {
update: page_view.update.clone(),
cache_infos: vec![cache_info],
});
debug_delay();
let result = page_view.entry.compare_and_set(
page_view.read,
new_page,
SeqCst,
guard,
);
if let Ok(new_shared) = result {
unsafe {
guard.defer_destroy(page_view.read);
}
let lsn = log_reservation.lsn();
self.log.iobufs.sa_mark_replace(
pid,
lsn,
&page_view.cache_infos,
cache_info,
guard,
)?;
let _pointer = log_reservation.complete()?;
let total_page_size =
unsafe { new_shared.deref().log_size() };
let to_evict =
self.lru.accessed(pid, total_page_size, guard);
trace!(
"accessed pid {} -> paging out pids {:?}",
pid,
to_evict
);
if !to_evict.is_empty() {
self.page_out(to_evict, guard)?;
}
trace!("rewriting pid {} succeeded", pid);
return Ok(());
} else {
let _pointer = log_reservation.abort()?;
trace!("rewriting pid {} failed", pid);
}
} else {
trace!("rewriting page with pid {}", pid);
let (key, update): (_, Update) = if pid == META_PID {
let meta_view = self.get_meta(guard)?;
(meta_view.0, Update::Meta(meta_view.deref().clone()))
} else if pid == COUNTER_PID {
let (key, counter) = self.get_idgen(guard)?;
(key, Update::Counter(counter))
} else if let Some(node_view) = self.get(pid, guard)? {
(node_view.0, Update::Node(node_view.deref().clone()))
} else {
let page_view = match self.inner.get(pid, guard) {
None => panic!("expected page missing in rewrite"),
Some(p) => p,
};
if page_view.is_free() {
(page_view, Update::Free)
} else {
debug!(
"when rewriting pid {} \
we encountered a rewritten \
node with a link {:?} that \
we previously witnessed a Free \
for (PageCache::get returned None), \
assuming we can just return now since \
the Free was replace'd",
pid, page_view.update
);
return Ok(());
}
};
let res = self.cas_page(pid, key, update, true, guard).map(
|res| {
trace!(
"rewriting pid {} success: {}",
pid,
res.is_ok()
);
res
},
)?;
if res.is_ok() {
return Ok(());
}
}
}
}
#[allow(clippy::cast_precision_loss)]
#[allow(clippy::float_arithmetic)]
#[doc(hidden)]
pub(crate) fn space_amplification(&self) -> Result<f64> {
let on_disk_bytes = self.size_on_disk()? as f64;
let logical_size = (self.logical_size_of_all_pages()?
+ self.config.segment_size as u64)
as f64;
Ok(on_disk_bytes / logical_size)
}
pub(crate) fn size_on_disk(&self) -> Result<u64> {
let mut size = self.config.file.metadata()?.len();
let stable = self.config.blob_path(0);
let blob_dir = stable.parent().expect(
"should be able to determine the parent for the blob directory",
);
let blob_files = std::fs::read_dir(blob_dir)?;
for blob_file in blob_files {
let blob_file = if let Ok(bf) = blob_file {
bf
} else {
continue;
};
#[cfg(not(miri))]
{
size += blob_file.metadata().map(|m| m.len()).unwrap_or(0);
}
#[cfg(miri)]
{
size += std::fs::metadata(blob_file.path())
.map(|m| m.len())
.unwrap_or(0);
}
}
Ok(size)
}
fn logical_size_of_all_pages(&self) -> Result<u64> {
let guard = pin();
let meta_size = self.get_meta(&guard)?.rss();
let idgen_size = std::mem::size_of::<u64>() as u64;
let mut ret = meta_size + idgen_size;
let min_pid = COUNTER_PID + 1;
let next_pid_to_allocate = *self.next_pid_to_allocate.lock();
for pid in min_pid..next_pid_to_allocate {
if let Some(node_cell) = self.get(pid, &guard)? {
ret += node_cell.rss();
}
}
Ok(ret)
}
fn cas_page<'g>(
&self,
pid: PageId,
mut old: PageView<'g>,
update: Update,
is_rewrite: bool,
guard: &'g Guard,
) -> Result<CasResult<'g, Update>> {
trace!(
"cas_page called on pid {} to {:?} with old ts {:?}",
pid,
update,
old.ts()
);
let log_kind = log_kind_from_update(&update);
trace!("cas_page on pid {} has log kind: {:?}", pid, log_kind);
let mut new_page = Some(Owned::new(Page {
update: Some(Box::new(update)),
cache_infos: Vec::default(),
}));
loop {
let mut page_ptr = new_page.take().unwrap();
let log_reservation = match &**page_ptr.update.as_ref().unwrap() {
Update::Counter(ref c) => {
self.log.reserve(log_kind, pid, c, guard)?
}
Update::Meta(ref m) => {
self.log.reserve(log_kind, pid, m, guard)?
}
Update::Free => self.log.reserve(log_kind, pid, &(), guard)?,
Update::Node(ref node) => {
self.log.reserve(log_kind, pid, node, guard)?
}
other => {
panic!("non-replacement used in cas_page: {:?}", other)
}
};
let lsn = log_reservation.lsn();
let new_pointer = log_reservation.pointer();
let ts = if is_rewrite { old.ts() } else { old.ts() + 1 };
let cache_info = CacheInfo {
ts,
lsn,
pointer: new_pointer,
log_size: u64::try_from(log_reservation.reservation_len())
.unwrap(),
};
page_ptr.cache_infos = vec![cache_info];
debug_delay();
let result =
old.entry.compare_and_set(old.read, page_ptr, SeqCst, guard);
match result {
Ok(new_shared) => {
unsafe {
guard.defer_destroy(old.read);
}
trace!("cas_page succeeded on pid {}", pid);
self.log.iobufs.sa_mark_replace(
pid,
lsn,
&old.cache_infos,
cache_info,
guard,
)?;
let _pointer = log_reservation.complete()?;
let total_page_size =
unsafe { new_shared.deref().log_size() };
let to_evict =
self.lru.accessed(pid, total_page_size, guard);
trace!(
"accessed pid {} -> paging out pids {:?}",
pid,
to_evict
);
if !to_evict.is_empty() {
self.page_out(to_evict, guard)?;
}
return Ok(Ok(PageView {
read: new_shared,
entry: old.entry,
}));
}
Err(cas_error) => {
trace!("cas_page failed on pid {}", pid);
let _pointer = log_reservation.abort()?;
let current: Shared<'_, _> = cas_error.current;
let actual_ts = unsafe { current.deref().ts() };
let mut returned_update: Owned<_> = cas_error.new;
if actual_ts != old.ts() || is_rewrite {
return Ok(Err(Some((
PageView { read: current, entry: old.entry },
*returned_update.update.take().unwrap(),
))));
}
trace!(
"retrying CAS on pid {} with same ts of {}",
pid,
old.ts()
);
old.read = current;
new_page = Some(returned_update);
}
} } }
pub(crate) fn get_meta<'g>(
&self,
guard: &'g Guard,
) -> Result<MetaView<'g>> {
trace!("getting page iter for META");
let page_view = match self.inner.get(META_PID, guard) {
None => {
return Err(Error::ReportableBug(
"failed to retrieve META page \
which should always be present"
.into(),
));
}
Some(p) => p,
};
if page_view.update.is_some() {
Ok(MetaView(page_view))
} else {
Err(Error::ReportableBug(
"failed to retrieve META page \
which should always be present"
.into(),
))
}
}
pub(crate) fn get_idgen<'g>(
&self,
guard: &'g Guard,
) -> Result<(PageView<'g>, u64)> {
trace!("getting page iter for idgen");
let page_view = match self.inner.get(COUNTER_PID, guard) {
None => {
return Err(Error::ReportableBug(
"failed to retrieve counter page \
which should always be present"
.into(),
));
}
Some(p) => p,
};
if page_view.update.is_some() {
let counter = page_view.as_counter();
Ok((page_view, counter))
} else {
Err(Error::ReportableBug(
"failed to retrieve counter page \
which should always be present"
.into(),
))
}
}
pub(crate) fn get<'g>(
&self,
pid: PageId,
guard: &'g Guard,
) -> Result<Option<NodeView<'g>>> {
trace!("getting page iterator for pid {}", pid);
let _measure = Measure::new(&M.get_page);
if pid == COUNTER_PID || pid == META_PID || pid == BATCH_MANIFEST_PID {
return Err(Error::Unsupported(
"you are not able to iterate over \
the first couple pages, which are \
reserved for storing metadata and \
monotonic ID generator info"
.into(),
));
}
let mut last_attempted_cache_info = None;
let mut last_err = None;
let mut page_view;
let mut updates: Vec<Update> = loop {
page_view = match self.inner.get(pid, guard) {
None => return Ok(None),
Some(p) => p,
};
if page_view.is_free() {
return Ok(None);
}
if page_view.update.is_some() {
let total_page_size = page_view.log_size();
let to_evict = self.lru.accessed(pid, total_page_size, guard);
trace!(
"accessed pid {} -> paging out pids {:?}",
pid,
to_evict
);
if !to_evict.is_empty() {
self.page_out(to_evict, guard)?;
}
return Ok(Some(NodeView(page_view)));
}
trace!(
"pulling pid {} view {:?} deref {:?}",
pid,
page_view,
page_view.deref()
);
if page_view.cache_infos.first()
== last_attempted_cache_info.as_ref()
{
return Err(last_err.unwrap());
} else {
last_attempted_cache_info =
page_view.cache_infos.first().copied();
}
let updates_result: Result<Vec<Update>> = page_view
.cache_infos
.iter()
.map(|ci| self.pull(pid, ci.lsn, ci.pointer))
.collect();
last_err = if let Ok(updates) = updates_result {
break updates;
} else {
Some(updates_result.unwrap_err())
};
};
let (base_slice, links) = updates.split_at_mut(1);
let base: &mut Node = base_slice[0].as_node_mut();
for link_update in links {
let link: &Link = link_update.as_link();
base.apply(link);
}
updates.truncate(1);
let base = updates.pop().unwrap();
let page = Owned::new(Page {
update: Some(Box::new(base)),
cache_infos: page_view.cache_infos.clone(),
});
debug_delay();
let result = page_view.entry.compare_and_set(
page_view.read,
page,
SeqCst,
guard,
);
if let Ok(new_shared) = result {
trace!("fix-up for pid {} succeeded", pid);
unsafe {
guard.defer_destroy(page_view.read);
}
let total_page_size = unsafe { new_shared.deref().log_size() };
let to_evict = self.lru.accessed(pid, total_page_size, guard);
trace!("accessed pid {} -> paging out pids {:?}", pid, to_evict);
if !to_evict.is_empty() {
self.page_out(to_evict, guard)?;
}
let mut page_view = page_view;
page_view.read = new_shared;
Ok(Some(NodeView(page_view)))
} else {
trace!("fix-up for pid {} failed", pid);
self.get(pid, guard)
}
}
pub const fn was_recovered(&self) -> bool {
self.was_recovered
}
pub(crate) fn generate_id_inner(&self) -> Result<u64> {
let ret = self.idgen.fetch_add(1, Release);
trace!("generating ID {}", ret);
let interval = self.config.idgen_persist_interval;
let necessary_persists = ret / interval * interval;
let mut persisted = self.idgen_persists.load(Acquire);
while persisted < necessary_persists {
let _mu = self.idgen_persist_mu.lock();
persisted = self.idgen_persists.load(Acquire);
if persisted < necessary_persists {
trace!(
"persisting ID gen, as persist count {} \
is below necessary persists {}",
persisted,
necessary_persists
);
let guard = pin();
let (key, current) = self.get_idgen(&guard)?;
assert_eq!(current, persisted);
let counter_update = Update::Counter(necessary_persists);
let old = self.idgen_persists.swap(necessary_persists, Release);
assert_eq!(old, persisted);
if self
.cas_page(COUNTER_PID, key, counter_update, false, &guard)?
.is_err()
{
continue;
}
iobuf::make_durable(&self.log.iobufs, key.last_lsn())?;
}
}
Ok(ret)
}
pub(crate) fn meta_pid_for_name(
&self,
name: &[u8],
guard: &Guard,
) -> Result<PageId> {
let m = self.get_meta(guard)?;
if let Some(root) = m.get_root(name) {
Ok(root)
} else {
Err(Error::CollectionNotFound(name.into()))
}
}
pub(crate) fn cas_root_in_meta<'g>(
&self,
name: &[u8],
old: Option<PageId>,
new: Option<PageId>,
guard: &'g Guard,
) -> Result<std::result::Result<(), Option<PageId>>> {
loop {
let meta_view = self.get_meta(guard)?;
let actual = meta_view.get_root(name);
if actual != old {
return Ok(Err(actual));
}
let mut new_meta = meta_view.deref().clone();
if let Some(new) = new {
new_meta.set_root(name.into(), new);
} else {
new_meta.del_root(name);
}
let new_meta_link = Update::Meta(new_meta);
let res = self.cas_page(
META_PID,
meta_view.0,
new_meta_link,
false,
guard,
)?;
match res {
Ok(_worked) => return Ok(Ok(())),
Err(Some((_current_pointer, _rejected))) => {}
Err(None) => {
return Err(Error::ReportableBug(
"replacing the META page has failed because \
the pagecache does not think it currently exists."
.into(),
));
}
}
}
}
fn page_out(&self, to_evict: Vec<PageId>, guard: &Guard) -> Result<()> {
let _measure = Measure::new(&M.page_out);
for pid in to_evict {
if pid == COUNTER_PID
|| pid == META_PID
|| pid == BATCH_MANIFEST_PID
{
continue;
}
loop {
if let Some(page_view) = self.inner.get(pid, guard) {
if page_view.is_free() {
break;
}
let new_page = Owned::new(Page {
update: None,
cache_infos: page_view.cache_infos.clone(),
});
debug_delay();
if page_view
.entry
.compare_and_set(
page_view.read,
new_page,
SeqCst,
guard,
)
.is_ok()
{
unsafe {
guard.defer_destroy(page_view.read);
}
break;
}
}
}
}
Ok(())
}
fn pull(&self, pid: PageId, lsn: Lsn, pointer: DiskPtr) -> Result<Update> {
use MessageKind::*;
trace!("pulling pid {} lsn {} pointer {} from disk", pid, lsn, pointer);
let _measure = Measure::new(&M.pull);
let expected_segment_number: SegmentNumber = SegmentNumber(
u64::try_from(lsn).unwrap()
/ u64::try_from(self.config.segment_size).unwrap(),
);
let (header, bytes) = match self.log.read(pid, lsn, pointer) {
Ok(LogRead::Inline(header, buf, _len)) => {
assert_eq!(
header.pid, pid,
"expected pid {} on pull of pointer {}, \
but got {} instead",
pid, pointer, header.pid
);
assert_eq!(
header.segment_number, expected_segment_number,
"expected segment number {:?} on pull of pointer {}, \
but got segment number {:?} instead",
expected_segment_number, pointer, header.segment_number
);
Ok((header, buf))
}
Ok(LogRead::Blob(header, buf, _blob_pointer, _inline_len)) => {
assert_eq!(
header.pid, pid,
"expected pid {} on pull of pointer {}, \
but got {} instead",
pid, pointer, header.pid
);
assert_eq!(
header.segment_number, expected_segment_number,
"expected segment number {:?} on pull of pointer {}, \
but got segment number {:?} instead",
expected_segment_number, pointer, header.segment_number
);
Ok((header, buf))
}
Ok(other) => {
debug!("read unexpected page: {:?}", other);
Err(Error::corruption(Some(pointer)))
}
Err(e) => {
debug!("failed to read page: {:?}", e);
Err(e)
}
}?;
let buf = &mut bytes.as_slice();
let update_res = {
let _deserialize_latency = Measure::new(&M.deserialize);
match header.kind {
Counter => u64::deserialize(buf).map(Update::Counter),
BlobMeta | InlineMeta => {
Meta::deserialize(buf).map(Update::Meta)
}
BlobLink | InlineLink => {
Link::deserialize(buf).map(Update::Link)
}
BlobNode | InlineNode => {
Node::deserialize(buf).map(Update::Node)
}
Free => Ok(Update::Free),
Corrupted | Canceled | Cap | BatchManifest => {
panic!("unexpected pull: {:?}", header.kind)
}
}
};
let update = update_res.expect("failed to deserialize data");
if let Update::Free = update {
Err(Error::ReportableBug(format!(
"non-link/replace found in pull of pid {}",
pid
)))
} else {
Ok(update)
}
}
fn load_snapshot(&mut self, snapshot: &Snapshot) -> Result<()> {
let next_pid_to_allocate = snapshot.pt.len() as PageId;
self.next_pid_to_allocate = Mutex::new(next_pid_to_allocate);
debug!("load_snapshot loading pages from 0..{}", next_pid_to_allocate);
for pid in 0..next_pid_to_allocate {
let state = if let Some(state) =
snapshot.pt.get(usize::try_from(pid).unwrap())
{
state
} else {
panic!(
"load_snapshot pid {} not found, despite being below the max pid {}",
pid, next_pid_to_allocate
);
};
trace!("load_snapshot pid {} {:?}", pid, state);
let mut cache_infos = Vec::default();
let guard = pin();
match *state {
PageState::Present { base, ref frags } => {
cache_infos.push(CacheInfo {
lsn: base.0,
pointer: base.1,
log_size: base.2,
ts: 0,
});
for (lsn, pointer, sz) in frags {
let cache_info = CacheInfo {
lsn: *lsn,
pointer: *pointer,
log_size: *sz,
ts: 0,
};
cache_infos.push(cache_info);
}
}
PageState::Free(lsn, pointer) => {
trace!("load_snapshot freeing pid {}", pid);
let cache_info = CacheInfo {
lsn,
pointer,
log_size: u64::try_from(MAX_MSG_HEADER_LEN).unwrap(),
ts: 0,
};
cache_infos.push(cache_info);
self.free.lock().push(pid);
}
_ => panic!("tried to load a {:?}", state),
}
trace!("installing page for pid {}", pid);
let update = if pid == META_PID || pid == COUNTER_PID {
let update =
self.pull(pid, cache_infos[0].lsn, cache_infos[0].pointer)?;
Some(Box::new(update))
} else if state.is_free() {
Some(Box::new(Update::Free))
} else {
None
};
let page = Page { update, cache_infos };
self.inner.insert(pid, page, &guard);
}
Ok(())
}
#[allow(unused)]
fn take_fuzzy_snapshot(self) -> Snapshot {
let stable_lsn_now: Lsn = self.log.stable_offset();
let pid_bound = *self.next_pid_to_allocate.lock();
let pid_bound_usize = assert_usize(pid_bound);
let mut page_states = Vec::<PageState>::with_capacity(pid_bound_usize);
let guard = pin();
for pid in 0..pid_bound {
'inner: loop {
if let Some(pg_view) = self.inner.get(pid, &guard) {
if pg_view.cache_infos.is_empty() {
std::thread::yield_now();
} else {
let page_state = pg_view.to_page_state();
page_states.push(page_state);
break 'inner;
}
} else {
std::thread::yield_now();
}
}
}
Snapshot {
stable_lsn: Some(stable_lsn_now),
active_segment: None,
pt: page_states,
}
}
}