//! The `SegmentAccountant` is an allocator for equally-
//! sized chunks of the underlying storage file (segments).
//!
//! It must maintain these critical safety properties:
//!
//! A. We must not overwrite existing segments when they
//! contain the most-recent stable state for a page.
//! B. We must not overwrite existing segments when active
//! threads may have references to LogId's that point
//! into those segments.
//!
//! To complicate matters, the `PageCache` only knows
//! when it has put a page into an IO buffer, but it
//! doesn't keep track of when that IO buffer is
//! stabilized (until write coalescing is implemented).
//!
//! To address these safety concerns, we rely on
//! these techniques:
//!
//! 1. We delay the reuse of any existing segment
//! by ensuring there are at least <# io buffers>
//! freed segments in front of the newly freed
//! segment in the free list. This ensures that
//! any pending IO buffer writes will hit
//! stable storage before we overwrite the
//! segment that may have contained the previous
//! latest stable copy of a page's state.
//! 2. we use a `epoch::Guard::defer()` that guarantees
//! any segment that has been logically freed
//! or emptied by the `PageCache` will have its
//! addition to the free segment list be delayed
//! until any active threads that were acting on
//! the shared state have checked-out.
//!
//! Another concern that arises due to the fact that
//! IO buffers may be written out-of-order is the
//! correct recovery of segments. If there is data
//! loss in recently written segments, we must be
//! careful to preserve linearizability in the log.
//! To do this, we must detect "torn segments" that
//! were not able to be fully written before a crash
//! happened. We detect torn individual segments by
//! writing a `SegmentTrailer` to the end of the
//! segment AFTER we have sync'd it. If the trailer
//! is not present during recovery, the recovery
//! process will not continue to a segment that
//! may contain logically later data.
//!
//! But what if we wrote a later segment, and its
//! trailer, before we were able to write its
//! immediate predecessor segment, and then a
//! crash happened? We must preserve linearizability,
//! so we can not accidentally recover the later
//! segment when its predecessor was lost in the crash.
//!
//! 3. This case is solved again by having used
//! <# io buffers> segments before reuse. We guarantee
//! that the last <# io buffers> segments will be
//! present, from which can deduce the "previous log
//! sequence number pointer". During recovery, if these
//! previous segment Lsn pointers don't match up, we know
//! we have encountered a lost segment, and we will not
//! continue the recovery past the detected gap.
use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
use std::fs::File;
use std::mem;
use std::sync::{Arc, Mutex};
use self::reader::LogReader;
use super::*;
/// The segment accountant keeps track of the logical blocks
/// of storage. It scans through all segments quickly during
/// recovery and attempts to locate torn segments.
#[derive(Debug)]
pub(super) struct SegmentAccountant {
// static or one-time set
config: Config,
// TODO these should be sharded to improve performance
segments: Vec<Segment>,
clean_counter: usize,
// TODO put behind a single mutex
// NB MUST group pause_rewriting with ordering
// and free!
free: Arc<Mutex<VecDeque<(LogId, bool)>>>,
tip: LogId,
to_clean: BTreeSet<LogId>,
pause_rewriting: bool,
safety_buffer: Vec<LogId>,
ordering: BTreeMap<Lsn, LogId>,
}
/// A `Segment` holds the bookkeeping information for
/// a contiguous block of the disk. It may contain many
/// fragments from different pages. Over time, we track
/// when segments become reusable and allow them to be
/// overwritten for new data.
#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
struct Segment {
present: BTreeSet<PageId>,
removed: HashSet<PageId>,
deferred_remove: HashSet<PageId>,
deferred_rm_blob: HashSet<BlobPointer>,
lsn: Option<Lsn>,
state: SegmentState,
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
enum SegmentState {
/// the segment is marked for reuse, should never receive
/// new pids,
/// TODO consider: but may receive removals for pids that were
/// already removed?
Free,
/// the segment is being written to or actively recovered, and
/// will have pages assigned to it
Active,
/// the segment is no longer being written to or recovered, and
/// will have pages marked as relocated from it
Inactive,
/// the segment is having its resident pages relocated before
/// becoming free
Draining,
}
use self::SegmentState::*;
impl Default for SegmentState {
fn default() -> SegmentState {
Free
}
}
impl Segment {
fn len(&self) -> usize {
std::cmp::max(self.present.len(), self.removed.len())
- self.removed.len()
}
fn _is_free(&self) -> bool {
match self.state {
Free => true,
_ => false,
}
}
fn is_inactive(&self) -> bool {
match self.state {
Inactive => true,
_ => false,
}
}
fn _is_active(&self) -> bool {
match self.state {
Active => true,
_ => false,
}
}
fn is_draining(&self) -> bool {
match self.state {
Draining => true,
_ => false,
}
}
fn free_to_active(&mut self, new_lsn: Lsn) {
trace!(
"setting Segment to Active with new lsn {:?}, was {:?}",
new_lsn,
self.lsn
);
assert_eq!(self.state, Free);
self.present.clear();
self.removed.clear();
self.deferred_remove.clear();
self.deferred_rm_blob.clear();
self.lsn = Some(new_lsn);
self.state = Active;
}
/// Transitions a segment to being in the Inactive state.
fn active_to_inactive(
&mut self,
lsn: Lsn,
from_recovery: bool,
config: &Config,
) -> Result<(), ()> {
trace!(
"setting Segment with lsn {:?} to Inactive",
self.lsn()
);
assert_eq!(self.state, Active);
if from_recovery {
assert!(lsn >= self.lsn());
} else {
assert_eq!(self.lsn.unwrap(), lsn);
}
self.state = Inactive;
// now we can push any deferred removals to the removed set
let deferred =
mem::replace(&mut self.deferred_remove, HashSet::new());
for pid in deferred {
self.remove_pid(pid, lsn);
}
let deferred_rm_blob =
mem::replace(&mut self.deferred_rm_blob, HashSet::new());
for ptr in deferred_rm_blob {
trace!(
"removing blob {} while transitioning \
segment lsn {:?} to Inactive",
ptr,
self.lsn,
);
remove_blob(ptr, config).map_err(|e| e.danger_cast())?;
}
Ok(())
}
fn inactive_to_draining(&mut self, lsn: Lsn) {
trace!(
"setting Segment with lsn {:?} to Draining",
self.lsn()
);
assert_eq!(self.state, Inactive);
assert!(lsn >= self.lsn());
self.state = Draining;
}
fn draining_to_free(&mut self, lsn: Lsn) {
trace!("setting Segment with lsn {:?} to Free", self.lsn());
assert!(self.is_draining());
assert!(lsn >= self.lsn());
self.present.clear();
self.removed.clear();
self.state = Free;
}
fn recovery_ensure_initialized(&mut self, lsn: Lsn) {
if let Some(current_lsn) = self.lsn {
if current_lsn != lsn {
assert!(lsn > current_lsn);
trace!(
"(snapshot) recovering segment with base lsn {}",
lsn
);
self.state = Free;
self.free_to_active(lsn);
}
} else {
trace!(
"(snapshot) recovering segment with base lsn {}",
lsn
);
self.free_to_active(lsn);
}
}
fn lsn(&self) -> Lsn {
self.lsn.unwrap()
}
/// Add a pid to the Segment. The caller must provide
/// the Segment's LSN.
fn insert_pid(&mut self, pid: PageId, lsn: Lsn) {
assert_eq!(lsn, self.lsn.unwrap());
// if this breaks, we didn't implement the transition
// logic right in write_to_log, and maybe a thread is
// using the SA to add pids AFTER their calls to
// res.complete() worked.
// FIXME Free, called from Tree::new -> replace -> mark_replace -> mark_link
// FIXME Inactive, called from Tree::del -> path_for_key -> pc::get -> pc::page_in ->
// pc::cas_page -> sa::free_segment -> sa::possibly_clean_or_free_segment ->
// Segment::draining_to_free
assert_eq!(
self.state, Active,
"expected segment with lsn {} to be Active",
lsn
);
assert!(!self.removed.contains(&pid));
self.present.insert(pid);
}
/// Mark that a pid in this Segment has been relocated.
/// The caller must provide the LSN of the removal.
fn remove_pid(&mut self, pid: PageId, lsn: Lsn) {
// TODO this could be racy?
assert!(lsn >= self.lsn.unwrap());
match self.state {
Active => {
// we have received a removal before
// transferring this segment to Inactive, so
// we defer this pid's removal until the transfer.
self.deferred_remove.insert(pid);
}
Inactive | Draining => {
self.present.remove(&pid);
self.removed.insert(pid);
}
Free => panic!("remove_pid called on a Free Segment"),
}
}
fn remove_blob(
&mut self,
blob_ptr: BlobPointer,
config: &Config,
) -> Result<(), ()> {
match self.state {
Active => {
// we have received a removal before
// transferring this segment to Inactive, so
// we defer this pid's removal until the transfer.
self.deferred_rm_blob.insert(blob_ptr);
}
Inactive | Draining => {
trace!(
"directly removing blob {} that was referred-to \
in a segment that has already been marked as Inactive \
or Draining.",
blob_ptr,
);
remove_blob(blob_ptr, config)
.map_err(|e| e.danger_cast())?;
}
Free => panic!("remove_blob called on a Free Segment"),
}
Ok(())
}
fn live_pct(&self) -> f64 {
let total = self.present.len() + self.removed.len();
self.present.len() as f64 / total as f64
}
fn can_free(&self) -> bool {
self.state == Draining && self.is_empty()
}
fn is_empty(&self) -> bool {
self.present.is_empty()
}
}
impl SegmentAccountant {
/// Create a new SegmentAccountant from previously recovered segments.
pub(super) fn start<R>(
config: Config,
snapshot: Snapshot<R>,
) -> Result<SegmentAccountant, ()> {
let mut ret = SegmentAccountant {
config: config,
segments: vec![],
clean_counter: 0,
free: Arc::new(Mutex::new(VecDeque::new())),
tip: 0,
to_clean: BTreeSet::new(),
pause_rewriting: false,
safety_buffer: vec![],
ordering: BTreeMap::new(),
};
if let SegmentMode::Linear = ret.config.segment_mode {
// this is a hack to prevent segments from being overwritten
// when operating without a `PageCache`
ret.pause_rewriting();
}
if snapshot.last_lid > ret.tip {
let io_buf_size = ret.config.io_buf_size;
let last_idx = snapshot.last_lid / io_buf_size as LogId;
let new_idx = last_idx + 1;
let new_tip = new_idx * io_buf_size as LogId;
ret.tip = new_tip;
}
ret.set_safety_buffer(snapshot.max_lsn)?;
ret.initialize_from_snapshot(snapshot)?;
Ok(ret)
}
fn initialize_from_snapshot<R>(
&mut self,
snapshot: Snapshot<R>,
) -> Result<(), ()> {
let io_buf_size = self.config.io_buf_size;
// generate segments from snapshot lids
let mut segments = vec![];
let add =
|pid, lsn, lid: LogId, segments: &mut Vec<Segment>| {
// add pid to segment
let idx = lid as usize / io_buf_size;
if segments.len() < idx + 1 {
segments.resize(idx + 1, Segment::default());
}
let segment_lsn =
lsn / io_buf_size as Lsn * io_buf_size as Lsn;
segments[idx]
.recovery_ensure_initialized(segment_lsn);
segments[idx].insert_pid(pid, segment_lsn);
};
for (pid, state) in snapshot.pt {
match state {
PageState::Present(coords) => {
for (lsn, ptr) in coords {
add(pid, lsn, ptr.lid(), &mut segments);
}
}
PageState::Allocated(lsn, ptr)
| PageState::Free(lsn, ptr) => {
add(pid, lsn, ptr.lid(), &mut segments);
}
}
}
for (idx, pids) in snapshot.replacements {
if segments.len() <= idx {
// segment doesn't have pids anyway,
// and will be marked as free later
continue;
}
if let Some(segment_lsn) = segments[idx].lsn {
for (pid, lsn) in pids {
if lsn < segment_lsn {
// TODO is this avoidable? can punt more
// work to snapshot generation logic.
trace!(
"stale removed pid {} with lsn {}, on segment {} with current lsn: {:?}",
pid,
lsn,
idx,
segments[idx].lsn
);
} else {
segments[idx].remove_pid(pid, lsn);
}
}
}
}
self.initialize_from_segments(segments)
}
fn initialize_from_segments(
&mut self,
mut segments: Vec<Segment>,
) -> Result<(), ()> {
// populate ordering from segments.
// use last segment as active even if it's full
let io_buf_size = self.config.io_buf_size;
let highest_lsn = segments.iter().fold(0, |acc, segment| {
std::cmp::max(acc, segment.lsn.unwrap_or(acc))
});
debug!(
"recovered highest_lsn in all segments: {}",
highest_lsn
);
// NB set tip BEFORE any calls to free_segment, as when
// we ensure the segment safety discipline, it is going to
// bump the tip, which hopefully is already the final recovered
// tip.
self.tip = (io_buf_size * segments.len()) as LogId;
// we need to make sure that we raise the tip over any
// segments that are in the safety_buffer. The safety_buffer
// may contain segments that are beyond what we have tracked
// in self.segments, because they may have been fully replaced
// in a later segment, or the next one, but they still need
// to be in the safety buffer, in order to prevent us from
// zeroing and recycling something in the safety buffer, breaking
// the recovery of later segments if a tear is discovered.
if self.tip != 0 {
for &lid in &self.safety_buffer {
if self.tip <= lid {
self.tip = lid + io_buf_size as LogId;
}
}
}
debug!("set self.tip to {}", self.tip);
for (idx, ref mut segment) in segments.iter_mut().enumerate()
{
let segment_start = idx as LogId * io_buf_size as LogId;
if segment.lsn.is_none() {
self.free_segment(segment_start, true);
continue;
}
let lsn = segment.lsn();
if lsn != highest_lsn {
segment.active_to_inactive(
lsn,
true,
&self.config,
)?;
}
self.ordering.insert(lsn, segment_start);
// can we transition these segments?
// we calculate the cleanup threshold in a skewed way,
// which encourages earlier segments to be rewritten
// more frequently.
let base_cleanup_threshold =
self.config.segment_cleanup_threshold;
let cleanup_skew = self.config.segment_cleanup_skew;
let relative_prop = if self.segments.is_empty() {
0.5
} else {
idx as f64 / self.segments.len() as f64
};
// we bias to having a higher threshold closer to segment 0
let inverse_prop = 1. - relative_prop;
let relative_threshold =
cleanup_skew as f64 * inverse_prop;
let computed_threshold =
base_cleanup_threshold + relative_threshold;
// We should always be below 1, or we will rewrite everything
let cleanup_threshold = if computed_threshold < 1. {
computed_threshold
} else {
0.99
};
let segment_low_pct =
segment.live_pct() <= cleanup_threshold;
let segment_low_count = (segment.len() as f64)
< MINIMUM_ITEMS_PER_SEGMENT as f64
* cleanup_threshold;
let can_free = segment.is_empty()
&& !self.pause_rewriting
&& lsn != highest_lsn;
let can_drain = (segment_low_pct || segment_low_count)
&& !self.pause_rewriting
&& lsn != highest_lsn;
// populate free and to_clean if the segment has seen
if can_free {
// can be reused immediately
if segment.state == Active {
segment.active_to_inactive(
lsn,
true,
&self.config,
)?;
}
if segment.state == Inactive {
segment.inactive_to_draining(lsn);
}
self.to_clean.remove(&segment_start);
trace!(
"pid {} freed @initialize_from_snapshot",
segment_start
);
segment.draining_to_free(lsn);
trace!(
"freeing segment {} from initialize_from_snapshot, tip: {}",
segment_start,
self.tip
);
self.free_segment(segment_start, true);
} else if can_drain {
// hack! we check here for pause_rewriting to work with
// raw logs, which are created with this set to true.
// can be cleaned
trace!(
"setting segment {} to Draining from initialize_from_snapshot",
segment_start
);
if segment.state == Active {
segment.active_to_inactive(
lsn,
true,
&self.config,
)?;
}
segment.inactive_to_draining(lsn);
self.to_clean.insert(segment_start);
self.free
.lock()
.unwrap()
.retain(|&(s, _)| s != segment_start);
} else {
self.free
.lock()
.unwrap()
.retain(|&(s, _)| s != segment_start);
}
}
trace!("initialized self.segments to {:?}", segments);
self.segments = segments;
if self.segments.is_empty() {
// this is basically just for when we recover with a single
// empty-yet-initialized segment
debug!(
"recovered no segments so not initializing from any",
);
}
Ok(())
}
fn set_safety_buffer(
&mut self,
snapshot_max_lsn: Lsn,
) -> Result<(), ()> {
self.ensure_ordering_initialized()?;
// if our ordering contains anything higher than
// what our snapshot logic scanned, it means it's
// empty, and we should nuke it to prevent incorrect
// recoveries.
let mut to_zero = vec![];
for (&lsn, &lid) in &self.ordering {
if lsn <= snapshot_max_lsn {
continue;
}
warn!(
"zeroing out empty segment header at lsn {} lid {}",
lsn, lid
);
to_zero.push(lsn);
let f = self.config.file()?;
maybe_fail!("zero garbage segment");
f.pwrite_all(&*vec![EVIL_BYTE; SEG_HEADER_LEN], lid)?;
f.sync_all()?;
maybe_fail!("zero garbage segment post");
}
for lsn in to_zero.into_iter() {
self.ordering.remove(&lsn);
}
let safety_buffer_len = self.config.io_bufs;
let mut safety_buffer: Vec<LogId> = self
.ordering
.iter()
.rev()
.take(safety_buffer_len)
.map(|(_lsn, lid)| *lid)
.collect();
// we want the things written last to be last in this Vec
safety_buffer.reverse();
while safety_buffer.len() < safety_buffer_len {
safety_buffer.insert(0, 0);
}
self.safety_buffer = safety_buffer;
Ok(())
}
fn free_segment(&mut self, lid: LogId, in_recovery: bool) {
debug!("freeing segment {}", lid);
debug!("safety_buffer before free: {:?}", self.safety_buffer);
debug!("free list before free {:?}", self.free);
let idx = self.lid_to_idx(lid);
assert_eq!(self.segments[idx].state, Free);
assert!(
!self.segment_in_free(lid),
"double-free of a segment occurred"
);
// we depend on the invariant that the last segments
// always link together, so that we can detect torn
// segments during recovery.
self.ensure_safe_free_distance();
if in_recovery {
self.free.lock().unwrap().push_back((lid, false));
// We only want to immediately remove the segment
// mapping if we're in recovery because otherwise
// we may be acting on updates relating to things
// in IO buffers, before they have been flushed.
// The latter will be removed from the mapping
// before being reused, in the next() method.
if let Some(old_lsn) = self.segments[idx].lsn {
trace!(
"removing segment {} with lsn {} from ordering",
lid,
old_lsn
);
self.ordering.remove(&old_lsn);
}
} else {
// We use a `epoch::Guard::defer()` to ensure that we never
// add a segment's LogId to the free deque while any
// active thread could be acting on it. This is necessary
// despite the "safe buffer" in the free queue because
// the safe buffer only prevents the sole remaining
// copy of a page from being overwritten. This prevents
// dangling references to segments that were rewritten after
// the `LogId` was read. Additionally, we guarantee that
// any reservations that have replaced items in this segment
// have already been fsynced, due to the EBR guard
// that is attached to any reservation before operating
// on a segment.
//
// We spawn a thread to accomplish this because we
// are already holding a lock to the segment accountant,
// and when the guard that is created below is dropped
// it may cause the segment accountant to be locked again.
// We can't have the same thread that is already holding
// the SA lock try to lock it again, or we deadlock.
let free = self.free.clone();
rayon::spawn(move || {
let guard = pin();
guard.defer(move || {
free.lock().unwrap().push_back((lid, false));
});
guard.flush();
});
}
}
/// Causes all new allocations to occur at the end of the file, which
/// is necessary to preserve consistency while concurrently iterating through
/// the log during snapshot creation.
pub(super) fn pause_rewriting(&mut self) {
self.pause_rewriting = true;
}
/// Re-enables segment rewriting after iteration is complete.
pub(super) fn resume_rewriting(&mut self) {
self.pause_rewriting = false;
}
/// Called by the `PageCache` when a page has been rewritten completely.
/// We mark all of the old segments that contained the previous state
/// from the page, and if the old segments are empty or clear enough to
/// begin accelerated cleaning we mark them as so.
pub(super) fn mark_replace(
&mut self,
pid: PageId,
lsn: Lsn,
old_ptrs: Vec<DiskPtr>,
new_ptr: DiskPtr,
) -> Result<(), ()> {
trace!(
"mark_replace pid {} from ptrs {:?} to ptr {} with lsn {}",
pid,
old_ptrs,
new_ptr,
lsn
);
let new_idx =
new_ptr.lid() as usize / self.config.io_buf_size;
// make sure we're not actively trying to replace the destination
let new_segment_start =
new_idx as LogId * self.config.io_buf_size as LogId;
self.to_clean.remove(&new_segment_start);
// Do we need to schedule any blob cleanups?
// Not if we just moved the pointer without changing
// the underlying blob, as is the case with a single Blob
// with nothing else.
let schedule_rm_blob =
!(old_ptrs.len() == 1 && old_ptrs[0].is_blob());
for old_ptr in old_ptrs {
if schedule_rm_blob && old_ptr.is_blob() {
trace!(
"queueing blob removal for {} in our own segment",
old_ptr
);
self.segments[new_idx]
.remove_blob(old_ptr.blob().1, &self.config)?;
}
let old_lid = old_ptr.lid();
let old_idx = self.lid_to_idx(old_lid);
if new_idx == old_idx {
// we probably haven't flushed this segment yet, so don't
// mark the pid as being removed from it
continue;
}
if self.segments[old_idx].lsn() > lsn {
// has been replaced after this call already,
// quite a big race happened
// TODO think about how this happens with our segment delay
continue;
}
if self.segments[old_idx].state == Free {
// this segment is already reused
// TODO should this be a panic?
continue;
}
self.segments[old_idx].remove_pid(pid, lsn);
// can we transition these segments?
self.possibly_clean_or_free_segment(old_idx, lsn);
}
self.mark_link(pid, lsn, new_ptr);
Ok(())
}
fn possibly_clean_or_free_segment(
&mut self,
idx: usize,
lsn: Lsn,
) {
// we calculate the cleanup threshold in a skewed way,
// which encourages earlier segments to be rewritten
// more frequently.
let base_cleanup_threshold =
self.config.segment_cleanup_threshold;
let cleanup_skew = self.config.segment_cleanup_skew;
assert!(!self.segments.is_empty());
let relative_prop = idx as f64 / self.segments.len() as f64;
// we bias to having a higher threshold closer to segment 0
let inverse_prop = 1. - relative_prop;
let relative_threshold = cleanup_skew as f64 * inverse_prop;
let computed_threshold =
base_cleanup_threshold + relative_threshold;
// We should always be below 1, or we will rewrite everything
let cleanup_threshold = if computed_threshold < 1. {
computed_threshold
} else {
0.99
};
let segment_start = (idx * self.config.io_buf_size) as LogId;
let segment_low_pct =
self.segments[idx].live_pct() <= cleanup_threshold;
let segment_low_count = (self.segments[idx].len() as f64)
< MINIMUM_ITEMS_PER_SEGMENT as f64 * cleanup_threshold;
let can_drain = self.segments[idx].is_inactive()
&& (segment_low_pct || segment_low_count);
if can_drain {
// can be cleaned
trace!(
"SA inserting {} into to_clean from possibly_clean_or_free_segment",
segment_start
);
self.segments[idx].inactive_to_draining(lsn);
self.to_clean.insert(segment_start);
}
if self.segments[idx].can_free() {
// can be reused immediately
self.segments[idx].draining_to_free(lsn);
self.to_clean.remove(&segment_start);
trace!(
"freed segment {} in possibly_clean_or_free_segment",
segment_start
);
self.free_segment(segment_start, false);
}
}
/// Called by the `PageCache` to find pages that are in
/// segments elligible for cleaning that it should
/// try to rewrite elsewhere.
pub(super) fn clean(
&mut self,
ignore_pid: Option<PageId>,
) -> Option<PageId> {
let item = self.to_clean.iter().nth(0).cloned();
if let Some(lid) = item {
let idx = self.lid_to_idx(lid);
let segment = &self.segments[idx];
assert_eq!(segment.state, Draining);
if segment.present.is_empty() {
// This could legitimately be empty if it's completely
// filled with failed flushes.
return None;
}
self.clean_counter += 1;
let offset = self.clean_counter % segment.present.len();
let pid = segment.present.iter().nth(offset).unwrap();
if Some(*pid) == ignore_pid {
return None;
}
trace!(
"telling caller to clean {} from segment at {}",
pid,
lid,
);
return Some(*pid);
}
None
}
/// Called from `PageCache` when some state has been added
/// to a logical page at a particular offset. We ensure the
/// page is present in the segment's page set.
pub(super) fn mark_link(
&mut self,
pid: PageId,
lsn: Lsn,
ptr: DiskPtr,
) {
trace!("mark_link pid {} at ptr {}", pid, ptr);
let idx = self.lid_to_idx(ptr.lid());
// make sure we're not actively trying to replace the destination
let new_segment_start =
idx as LogId * self.config.io_buf_size as LogId;
self.to_clean.remove(&new_segment_start);
let segment = &mut self.segments[idx];
let segment_lsn = lsn / self.config.io_buf_size as Lsn
* self.config.io_buf_size as Lsn;
// a race happened, and our Lsn does not apply anymore
assert_eq!(
segment.lsn(),
segment_lsn,
"segment somehow got reused by the time a link was \
marked on it. expected lsn: {} actual: {}",
segment_lsn,
segment.lsn()
);
segment.insert_pid(pid, segment_lsn);
}
/// Called after the trailer of a segment has been written to disk,
/// indicating that no more pids will be added to a segment. Moves
/// the segment into the Inactive state.
///
/// # Panics
/// The provided lsn and lid must exactly match the existing segment.
pub(super) fn deactivate_segment(
&mut self,
lsn: Lsn,
lid: LogId,
) -> Result<(), ()> {
let idx = self.lid_to_idx(lid);
self.segments[idx].active_to_inactive(
lsn,
false,
&self.config,
)?;
self.possibly_clean_or_free_segment(idx, lsn);
Ok(())
}
fn bump_tip(&mut self) -> LogId {
let lid = self.tip;
self.tip += self.config.io_buf_size as LogId;
trace!("advancing file tip from {} to {}", lid, self.tip);
lid
}
fn ensure_safe_free_distance(&mut self) {
// NB If updates always have to wait in a queue
// at least as long as the number of IO buffers, it
// guarantees that the old updates are actually safe
// somewhere else first. Note that we push_front here
// so that the log tip is used first.
// This is so that we will never give out a segment
// that has been placed on the free queue after its
// contained pages have all had updates added to an
// IO buffer during a PageCache replace, but whose
// replacing updates have not actually landed on disk
// yet.
while self.free.lock().unwrap().len() < self.config.io_bufs {
let new_lid = self.bump_tip();
trace!(
"pushing segment {} to free from ensure_safe_free_distance",
new_lid
);
self.free.lock().unwrap().push_front((new_lid, true));
}
}
/// Returns the next offset to write a new segment in.
pub(super) fn next(&mut self, lsn: Lsn) -> Result<LogId, ()> {
assert_eq!(
lsn % self.config.io_buf_size as Lsn,
0,
"unaligned Lsn provided to next!"
);
// pop free or add to end
let lid = if self.pause_rewriting {
self.bump_tip()
} else {
loop {
let pop_res = self.free.lock().unwrap().pop_front();
if let Some((
next,
pushed_by_ensure_safe_free_distance,
)) = pop_res
{
// it's safe for us to truncate only
// if we don't dig into the safety buffer
// on our next pop
let next_next_in_safety_buffer = self
.free
.lock()
.unwrap()
.get(0)
.cloned()
.map(|(lid, _)| {
self.safety_buffer.contains(&lid)
}).unwrap_or(false);
// this will only be in safety_buffer if it's the last
// element
let truncate_prohibited =
pushed_by_ensure_safe_free_distance
|| next_next_in_safety_buffer;
if truncate_prohibited {
break next;
}
// if we just returned the last segment
// in the file, shrink the file.
let io_buf_size =
self.config.io_buf_size as LogId;
if next + io_buf_size == self.tip {
self.truncate(next)?;
} else {
break next;
}
} else {
break self.bump_tip();
}
}
};
debug!(
"zeroing out segment beginning at {} for future lsn {}",
lid, lsn
);
let f = self.config.file()?;
maybe_fail!("zero segment");
f.pwrite_all(
&*vec![EVIL_BYTE; self.config.io_buf_size],
lid,
)?;
f.sync_all()?;
maybe_fail!("zero segment post");
let last_given = self.safety_buffer[self.config.io_bufs - 1];
// pin lsn to this segment
let idx = self.lid_to_idx(lid);
assert_eq!(self.segments[idx].state, Free);
// remove the old ordering from our list
if let Some(old_lsn) = self.segments[idx].lsn {
self.ordering.remove(&old_lsn);
}
self.segments[idx].free_to_active(lsn);
self.ordering.insert(lsn, lid);
debug!(
"segment accountant returning offset: {} \
paused: {} last: {} on deck: {:?}",
lid, self.pause_rewriting, last_given, self.free
);
if lid == 0 {
let all_zeroes =
self.safety_buffer == vec![0; self.config.io_bufs];
let no_zeroes = !self.safety_buffer.contains(&0);
assert!(
all_zeroes || no_zeroes,
"SA returning 0, and we expected \
the safety buffer to either be all zeroes, or contain no other \
zeroes, but it was {:?}",
self.safety_buffer
);
} else {
assert!(
!self.safety_buffer.contains(&lid),
"giving away segment {} that is in the safety buffer {:?}",
lid,
self.safety_buffer
);
}
self.safety_buffer.push(lid);
self.safety_buffer.remove(0);
Ok(lid)
}
/// Returns an iterator over a snapshot of current segment
/// log sequence numbers and their corresponding file offsets.
pub(super) fn segment_snapshot_iter_from(
&mut self,
lsn: Lsn,
) -> Box<dyn Iterator<Item = (Lsn, LogId)>> {
assert!(
self.pause_rewriting,
"must pause rewriting before \
iterating over segments"
);
let segment_len = self.config.io_buf_size as Lsn;
let normalized_lsn = lsn / segment_len * segment_len;
Box::new(
self.ordering
.clone()
.into_iter()
.filter(move |&(l, _)| l >= normalized_lsn),
)
}
// truncate the file to the desired length
fn truncate(&mut self, at: LogId) -> Result<(), ()> {
assert_eq!(
at % self.config.io_buf_size as LogId,
0,
"new length must be io-buf-len aligned"
);
if self.safety_buffer.contains(&at) {
panic!(
"file tip {} to be truncated is in the safety buffer {:?}",
at,
self.safety_buffer
);
}
self.tip = at;
assert!(
!self.segment_in_free(at),
"double-free of a segment occurred"
);
debug!("truncating file to length {}", at);
let f = self.config.file()?;
f.set_len(at)?;
f.sync_all().map_err(|e| e.into())
}
fn ensure_ordering_initialized(&mut self) -> Result<(), ()> {
if !self.ordering.is_empty() {
return Ok(());
}
self.ordering = scan_segment_lsns(0, &self.config)?;
debug!("initialized ordering to {:?}", self.ordering);
Ok(())
}
fn lid_to_idx(&mut self, lid: LogId) -> usize {
let idx = lid as usize / self.config.io_buf_size;
// TODO never resize like this, make it a single
// responsibility when the tip is bumped / truncated.
if self.segments.len() < idx + 1 {
self.segments.resize(idx + 1, Segment::default());
}
idx
}
fn segment_in_free(&self, lid: LogId) -> bool {
let free = self.free.lock().unwrap();
for &(seg_lid, _) in &*free {
if seg_lid == lid {
return true;
}
}
false
}
}
// Scan the log file if we don't know of any Lsn offsets yet,
// and recover the order of segments, and the highest Lsn.
fn scan_segment_lsns(
min: Lsn,
config: &Config,
) -> Result<BTreeMap<Lsn, LogId>, ()> {
let mut ordering = BTreeMap::new();
let segment_len = config.io_buf_size as LogId;
let mut cursor = 0;
let f = config.file()?;
while let Ok(segment) = f.read_segment_header(cursor) {
// in the future this can be optimized to just read
// the initial header at that position... but we need to
// make sure the segment is not torn
trace!(
"SA scanned header at lid {} during startup: {:?}",
cursor,
segment
);
if segment.ok
&& (segment.lsn != 0 || cursor == 0)
&& segment.lsn >= min
{
// if lsn is 0, this is free
assert!(
!ordering.contains_key(&segment.lsn),
"duplicate segment LSN {} detected at both {} and {}, \
one should have been zeroed out during recovery",
segment.lsn,
ordering[&segment.lsn],
cursor
);
ordering.insert(segment.lsn, cursor);
}
cursor += segment_len;
}
debug!("ordering before clearing tears: {:?}", ordering);
// Check that the last <# io buffers> segments properly
// link their previous segment pointers.
Ok(clean_tail_tears(ordering, config, &f))
}
// This ensures that the last <# io buffers> segments on
// disk connect via their previous segment pointers in
// the header. This is important because we expect that
// the last <# io buffers> segments will join up, and we
// never reuse buffers within this safety range.
fn clean_tail_tears(
mut ordering: BTreeMap<Lsn, LogId>,
config: &Config,
f: &File,
) -> BTreeMap<Lsn, LogId> {
let safety_buffer = config.io_bufs;
let logical_tail: Vec<Lsn> = ordering
.iter()
.rev()
.take(safety_buffer)
.map(|(lsn, _lid)| *lsn)
.collect();
let io_buf_size = config.io_buf_size;
let mut tear_at = None;
// make sure the last <# io_bufs> segments are contiguous
for window in logical_tail.windows(2) {
if window[0] != window[1] + io_buf_size as Lsn {
error!(
"detected torn segment somewhere after {}",
window[1]
);
tear_at = Some(window[1]);
}
}
// if any segment doesn't have a proper trailer, invalidate
// everything after it, since we can't preserve linearizability
// for segments after a tear.
for (&lsn, &lid) in &ordering {
let trailer_lid =
lid + io_buf_size as LogId - SEG_TRAILER_LEN as LogId;
let expected_trailer_lsn =
lsn + io_buf_size as Lsn - SEG_TRAILER_LEN as Lsn;
let trailer_res = f.read_segment_trailer(trailer_lid);
if trailer_res.is_err() {
// trailer could not be read
debug!(
"could not read trailer of segment starting at {}",
lid
);
if let Some(existing_tear) = tear_at {
if existing_tear > lsn {
tear_at = Some(lsn);
}
} else {
tear_at = Some(lsn);
}
break;
}
let trailer = trailer_res.unwrap();
if !trailer.ok
|| trailer.lsn != expected_trailer_lsn
|| (lsn == 0 && lid != 0)
{
// trailer's checksum failed, or
// the lsn is outdated, or
// the lsn is 0 but the lid isn't 0 (zeroed segment)
debug!(
"tear detected at expected trailer lsn {} header lsn {} \
lid {} for trailer {:?}",
expected_trailer_lsn, lsn, lid, trailer
);
if let Some(existing_tear) = tear_at {
if existing_tear > lsn {
tear_at = Some(lsn);
}
} else {
tear_at = Some(lsn);
}
}
}
if let Some(tear) = tear_at {
// we need to chop off the elements after the tear
debug!(
"filtering out segments after detected tear at {}",
tear
);
for (&lsn, &lid) in &ordering {
if lsn > tear {
error!(
"filtering out segment with lsn {} at lid {}",
lsn, lid
);
f.pwrite_all(&*vec![EVIL_BYTE; SEG_HEADER_LEN], lid)
.expect(
"should be able to mark a linear-orphan \
segment as invalid",
);
f.sync_all().expect(
"should be able to sync data \
file after purging linear-orphan",
);
}
}
ordering = ordering
.into_iter()
.filter(|&(lsn, _lid)| lsn <= tear)
.collect();
}
ordering
}
/// The log may be configured to write data
/// in several different ways, depending on
/// the constraints of the system using it.
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub enum SegmentMode {
/// Write to the end of the log, always.
Linear,
/// Keep track of segment utilization, and
/// reuse segments when their contents are
/// fully relocated elsewhere.
/// Will try to copy data out of segments
/// once they reach a configurable threshold.
Gc,
}
pub(super) fn raw_segment_iter_from(
lsn: Lsn,
config: &Config,
) -> Result<LogIter, ()> {
let segment_len = config.io_buf_size as Lsn;
let normalized_lsn = lsn / segment_len * segment_len;
let ordering = scan_segment_lsns(0, &config)?;
trace!(
"generated iterator over segments {:?} with lsn >= {}",
ordering,
normalized_lsn
);
let segment_iter = Box::new(
ordering
.into_iter()
.filter(move |&(l, _)| l >= normalized_lsn),
);
Ok(LogIter {
config: config.clone(),
max_lsn: std::i64::MAX,
cur_lsn: SEG_HEADER_LEN as Lsn,
segment_base: None,
segment_iter: segment_iter,
segment_len: config.io_buf_size,
use_compression: config.use_compression,
trailer: None,
})
}