use std::{
collections::{BTreeMap, BTreeSet},
fs::File,
mem,
};
use self::reader::LogReader;
use futures::{future::Future, oneshot, Oneshot};
use super::*;
#[derive(Debug)]
pub(super) struct SegmentAccountant {
config: Config,
segments: Vec<Segment>,
clean_counter: usize,
free: BTreeMap<LogId, bool>,
tip: LogId,
to_clean: FastSet8<LogId>,
pause_rewriting: bool,
safety_buffer: Vec<LogId>,
ordering: BTreeMap<Lsn, LogId>,
async_truncations: Vec<Oneshot<Result<()>>>,
}
#[cfg(feature = "event_log")]
impl Drop for SegmentAccountant {
fn drop(&mut self) {
let segments: std::collections::HashMap<
Lsn,
(SegmentState, u8, LogId),
> = self
.ordering
.clone()
.into_iter()
.map(|(lsn, lid)| {
let id = self.lid_to_idx(lid);
let segment = &self.segments[id];
let live = segment.live_pct();
let state = segment.state.clone();
(lsn, (state, live, lid))
})
.collect();
self.config.event_log.segments_before_restart(segments);
}
}
#[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)]
struct Segment {
present: BTreeSet<PageId>,
removed: FastSet8<PageId>,
deferred_rm_blob: FastSet8<BlobPointer>,
deferred_replacements: FastSet8<(PageId, SegmentId)>,
lsn: Option<Lsn>,
state: SegmentState,
}
#[derive(
Debug,
Copy,
Eq,
Hash,
Ord,
PartialOrd,
PartialEq,
Clone,
Serialize,
Deserialize,
)]
pub(crate) enum SegmentState {
Free,
Active,
Inactive,
Draining,
}
use self::SegmentState::*;
impl Default for SegmentState {
fn default() -> SegmentState {
Free
}
}
impl Segment {
fn len(&self) -> usize {
std::cmp::max(self.present.len(), self.removed.len())
- self.removed.len()
}
fn is_free(&self) -> bool {
self.state == Free
}
fn is_inactive(&self) -> bool {
match self.state {
Inactive => true,
_ => false,
}
}
fn _is_active(&self) -> bool {
match self.state {
Active => true,
_ => false,
}
}
fn is_draining(&self) -> bool {
match self.state {
Draining => true,
_ => false,
}
}
fn free_to_active(&mut self, new_lsn: Lsn) {
trace!(
"setting Segment to Active with new lsn {:?}, was {:?}",
new_lsn,
self.lsn
);
assert_eq!(self.state, Free);
self.present.clear();
self.removed.clear();
self.deferred_rm_blob.clear();
self.deferred_replacements.clear();
self.lsn = Some(new_lsn);
self.state = Active;
}
fn active_to_inactive(
&mut self,
lsn: Lsn,
from_recovery: bool,
config: &Config,
) -> Result<FastSet8<(PageId, usize)>> {
trace!("setting Segment with lsn {:?} to Inactive", self.lsn());
assert_eq!(self.state, Active);
if from_recovery {
assert!(lsn >= self.lsn());
} else {
assert_eq!(self.lsn.unwrap(), lsn);
}
self.state = Inactive;
let deferred_rm_blob =
mem::replace(&mut self.deferred_rm_blob, FastSet8::default());
for ptr in deferred_rm_blob {
trace!(
"removing blob {} while transitioning \
segment lsn {:?} to Inactive",
ptr,
self.lsn,
);
remove_blob(ptr, config)?;
}
let deferred_replacements =
mem::replace(&mut self.deferred_replacements, FastSet8::default());
Ok(deferred_replacements)
}
fn inactive_to_draining(&mut self, lsn: Lsn) {
trace!("setting Segment with lsn {:?} to Draining", self.lsn());
assert_eq!(self.state, Inactive);
assert!(lsn >= self.lsn());
self.state = Draining;
}
fn draining_to_free(&mut self, lsn: Lsn) {
trace!("setting Segment with lsn {:?} to Free", self.lsn());
assert!(self.is_draining());
assert!(lsn >= self.lsn());
self.present.clear();
self.removed.clear();
self.state = Free;
}
fn recovery_ensure_initialized(&mut self, lsn: Lsn) {
if let Some(current_lsn) = self.lsn {
if current_lsn != lsn {
assert!(lsn > current_lsn);
trace!("(snapshot) recovering segment with base lsn {}", lsn);
self.state = Free;
self.free_to_active(lsn);
}
} else {
trace!("(snapshot) recovering segment with base lsn {}", lsn);
self.free_to_active(lsn);
}
}
fn lsn(&self) -> Lsn {
self.lsn.unwrap()
}
fn insert_pid(&mut self, pid: PageId, lsn: Lsn) {
assert_eq!(lsn, self.lsn.unwrap());
assert_eq!(
self.state, Active,
"expected segment with lsn {} to be Active",
lsn
);
assert!(!self.removed.contains(&pid));
self.present.insert(pid);
}
fn remove_pid(&mut self, pid: PageId, lsn: Lsn, in_recovery: bool) {
assert!(lsn >= self.lsn.unwrap());
match self.state {
Active => {
if !in_recovery {
panic!("remove_pid called on Active segment");
}
assert!(
!self.present.contains(&pid),
"did not expect present to contain pid {} during recovery",
pid,
);
self.removed.insert(pid);
}
Inactive | Draining => {
self.present.remove(&pid);
self.removed.insert(pid);
}
Free => panic!("remove_pid called on a Free Segment"),
}
}
fn defer_replace_pids(
&mut self,
deferred: FastSet8<(PageId, usize)>,
lsn: Lsn,
) {
assert!(lsn >= self.lsn.unwrap());
for item in deferred.into_iter() {
self.deferred_replacements.insert(item);
}
}
fn remove_blob(
&mut self,
blob_ptr: BlobPointer,
config: &Config,
) -> Result<()> {
match self.state {
Active => {
self.deferred_rm_blob.insert(blob_ptr);
}
Inactive | Draining => {
trace!(
"directly removing blob {} that was referred-to \
in a segment that has already been marked as Inactive \
or Draining.",
blob_ptr,
);
remove_blob(blob_ptr, config)?;
}
Free => panic!("remove_blob called on a Free Segment"),
}
Ok(())
}
fn live_pct(&self) -> u8 {
let total = self.present.len() + self.removed.len();
if total == 0 {
return 100;
}
let live = self.present.len() * 100 / total;
assert!(live <= 100);
live as u8
}
fn can_free(&self) -> bool {
self.state == Draining && self.is_empty()
}
fn is_empty(&self) -> bool {
self.present.is_empty()
}
}
impl SegmentAccountant {
pub(super) fn start(
config: Config,
snapshot: Snapshot,
) -> Result<SegmentAccountant> {
let mut ret = SegmentAccountant {
config,
segments: vec![],
clean_counter: 0,
free: BTreeMap::default(),
tip: 0,
to_clean: FastSet8::default(),
pause_rewriting: false,
safety_buffer: vec![],
ordering: BTreeMap::new(),
async_truncations: Vec::new(),
};
if let SegmentMode::Linear = ret.config.segment_mode {
ret.pause_rewriting();
}
if snapshot.max_lsn >= SEG_HEADER_LEN as Lsn && !snapshot.pt.is_empty()
{
ret.set_safety_buffer(snapshot.max_lsn)?;
ret.initialize_from_snapshot(snapshot)?;
} else {
trace!(
"skipping initialization of SA \
for snapshot with max_lsn {} and pt {:?}",
snapshot.max_lsn,
snapshot.pt,
);
}
Ok(ret)
}
fn initialize_from_snapshot(&mut self, snapshot: Snapshot) -> Result<()> {
let io_buf_size = self.config.io_buf_size;
let mut segments = vec![];
let add = |pid, lsn, lid: LogId, segments: &mut Vec<Segment>| {
let idx = lid as usize / io_buf_size;
if segments.len() < idx + 1 {
segments.resize(idx + 1, Segment::default());
}
trace!(
"adding lsn: {} lid: {} for pid {} to segment {} during SA recovery",
lsn,
lid,
pid,
idx
);
let segment_lsn = lsn / io_buf_size as Lsn * io_buf_size as Lsn;
segments[idx].recovery_ensure_initialized(segment_lsn);
segments[idx].insert_pid(pid, segment_lsn);
};
for (pid, state) in snapshot.pt {
match state {
PageState::Present(coords) => {
for (lsn, ptr) in coords {
add(pid, lsn, ptr.lid(), &mut segments);
}
}
PageState::Free(lsn, ptr) => {
add(pid, lsn, ptr.lid(), &mut segments);
}
}
}
let mut ordering = segments
.iter()
.enumerate()
.map(|(i, s)| (i, s.lsn))
.collect::<Vec<_>>();
ordering.sort_unstable_by_key(|(_idx, lsn)| *lsn);
for (idx, _lsn) in ordering.into_iter() {
let (replacement_lsn, replacements) =
if let Some(r) = snapshot.replacements.get(&idx) {
r
} else {
continue;
};
for &(old_pid, old_idx) in replacements {
let replaced_current_lsn =
segments.get(old_idx).and_then(|s| s.lsn);
if replaced_current_lsn.unwrap_or(Lsn::max_value())
> *replacement_lsn
{
continue;
}
segments[old_idx].remove_pid(old_pid, *replacement_lsn, true);
}
}
self.initialize_from_segments(segments)
}
fn initialize_from_segments(
&mut self,
mut segments: Vec<Segment>,
) -> Result<()> {
let io_buf_size = self.config.io_buf_size;
let highest_lsn = segments
.iter()
.filter_map(|s| s.lsn)
.filter(|lsn| *lsn != Lsn::max_value())
.max()
.unwrap_or(0);
debug!("recovered highest_lsn in all segments: {}", highest_lsn);
for &lid in &self.safety_buffer {
if self.tip <= lid {
self.tip = lid + io_buf_size as LogId;
}
}
debug!("set self.tip to {} based on safety buffer", self.tip);
let segments_len = segments.len();
let max_lsn = Lsn::max_value();
for (idx, ref mut segment) in segments.iter_mut().enumerate() {
let segment_start = idx as LogId * io_buf_size as LogId;
if segment_start >= self.tip {
self.tip = segment_start + io_buf_size as LogId;
debug!(
"set self.tip to {} based on encountered segment",
self.tip
);
}
if let Some(lsn) = segment.lsn {
if lsn != highest_lsn && segment.state == Active {
segment.active_to_inactive(lsn, true, &self.config)?;
}
self.ordering.insert(lsn, segment_start);
}
let can_free = segment.lsn.is_none() || segment.is_empty();
let can_drain = segment_is_drainable(
idx,
segments_len,
segment.live_pct(),
segment.len(),
&self.config,
);
if self.pause_rewriting || segment.lsn == Some(highest_lsn) {
self.free.remove(&segment_start);
} else if can_free {
if segment.state == Inactive {
segment.inactive_to_draining(max_lsn);
segment.draining_to_free(max_lsn);
} else {
assert_eq!(segment.state, Free);
}
self.to_clean.remove(&segment_start);
trace!(
"freeing segment {} from initialize_from_snapshot, tip: {}",
segment_start,
self.tip
);
if !self.free.contains_key(&segment_start) {
self.free_segment(segment_start, true);
} else {
trace!(
"skipped freeing of segment {} \
because it was already in free list",
segment_start,
);
}
} else if can_drain {
trace!(
"setting segment {} to Draining from initialize_from_snapshot",
segment_start
);
segment.inactive_to_draining(max_lsn);
self.to_clean.insert(segment_start);
self.free.remove(&segment_start);
} else {
self.free.remove(&segment_start);
}
}
trace!("initialized self.segments to {:?}", segments);
self.segments = segments;
if self.segments.is_empty() {
debug!("recovered no segments so not initializing from any",);
}
#[cfg(feature = "event_log")]
{
let segments: std::collections::HashMap<
Lsn,
(SegmentState, u8, LogId),
> = self
.ordering
.iter()
.map(|(&lsn, &lid)| {
let id = lid as usize / self.config.io_buf_size;
let segment = &self.segments[id];
let live = segment.live_pct();
let state = segment.state.clone();
(lsn, (state, live, lid))
})
.collect();
self.config.event_log.segments_after_restart(segments);
}
Ok(())
}
fn set_safety_buffer(&mut self, snapshot_max_lsn: Lsn) -> Result<()> {
self.ensure_ordering_initialized()?;
let mut to_zero = vec![];
for (&lsn, &lid) in &self.ordering {
assert_ne!(lsn, Lsn::max_value());
if lsn <= snapshot_max_lsn {
continue;
}
warn!(
"zeroing out empty segment header for segment \
above snapshot_max_lsn {} at lsn {} lid {}",
snapshot_max_lsn, lsn, lid
);
to_zero.push(lsn);
let f = &self.config.file;
maybe_fail!("zero garbage segment");
f.pwrite_all(&*vec![EVIL_BYTE; SEG_HEADER_LEN], lid)?;
f.sync_all()?;
maybe_fail!("zero garbage segment post");
}
for lsn in to_zero.into_iter() {
self.ordering.remove(&lsn);
}
let safety_buffer_len = self.config.io_bufs;
let mut safety_buffer: Vec<LogId> = self
.ordering
.iter()
.rev()
.take(safety_buffer_len)
.map(|(_lsn, lid)| *lid)
.collect();
safety_buffer.reverse();
self.safety_buffer = safety_buffer;
Ok(())
}
fn free_segment(&mut self, lid: LogId, in_recovery: bool) {
debug!("freeing segment {}", lid);
debug!("safety_buffer before free: {:?}", self.safety_buffer);
debug!("free list before free {:?}", self.free);
let idx = self.lid_to_idx(lid);
assert!(
self.tip > lid,
"freed a segment above our current file tip, \
please report this bug!"
);
assert_eq!(self.segments[idx].state, Free);
assert!(
!self.segment_in_free(lid),
"double-free of a segment occurred"
);
self.ensure_safe_free_distance(lid);
if in_recovery {
if let Some(old_lsn) = self.segments[idx].lsn {
trace!(
"removing segment {} with lsn {} from ordering",
lid,
old_lsn
);
self.ordering.remove(&old_lsn);
}
}
self.free.insert(lid, false);
}
pub(super) fn pause_rewriting(&mut self) {
self.pause_rewriting = true;
}
pub(super) fn resume_rewriting(&mut self) {
if self.config.segment_mode != SegmentMode::Linear {
self.pause_rewriting = false;
}
}
pub(super) fn mark_replace(
&mut self,
pid: PageId,
lsn: Lsn,
old_ptrs: Vec<DiskPtr>,
new_ptr: DiskPtr,
) -> Result<()> {
let _measure = Measure::new(&M.accountant_mark_replace);
trace!(
"mark_replace pid {} from ptrs {:?} to ptr {} with lsn {}",
pid,
old_ptrs,
new_ptr,
lsn
);
let new_idx = new_ptr.lid() as usize / self.config.io_buf_size;
let new_segment_start =
new_idx as LogId * self.config.io_buf_size as LogId;
assert!(!self.to_clean.contains(&new_segment_start));
let schedule_rm_blob = !(old_ptrs.len() == 1 && old_ptrs[0].is_blob());
let mut deferred_replacements = FastSet8::default();
for old_ptr in old_ptrs {
let old_lid = old_ptr.lid();
if old_lid == LogId::max_value() {
continue;
}
if schedule_rm_blob && old_ptr.is_blob() {
trace!(
"queueing blob removal for {} in our own segment",
old_ptr
);
self.segments[new_idx]
.remove_blob(old_ptr.blob().1, &self.config)?;
}
let old_idx = self.lid_to_idx(old_lid);
if new_idx == old_idx {
continue;
}
if self.segments[old_idx].lsn() > lsn {
panic!(
"mark_replace called on previous version of segment. \
this means it was reused while other threads still \
had references to it."
);
}
if self.segments[old_idx].state == Free {
panic!(
"mark_replace called on Free segment with lid {}. \
this means it was dropped while other threads still had \
references to it.",
old_idx * self.config.io_buf_size
);
}
deferred_replacements.insert((pid, old_idx));
}
self.segments[new_idx].defer_replace_pids(deferred_replacements, lsn);
self.mark_link(pid, lsn, new_ptr);
Ok(())
}
fn possibly_clean_or_free_segment(&mut self, idx: usize, lsn: Lsn) {
let can_drain = segment_is_drainable(
idx,
self.segments.len(),
self.segments[idx].live_pct(),
self.segments[idx].len(),
&self.config,
) && self.segments[idx].is_inactive();
let segment_start = (idx * self.config.io_buf_size) as LogId;
if can_drain {
trace!(
"SA inserting {} into to_clean from possibly_clean_or_free_segment",
segment_start
);
self.segments[idx].inactive_to_draining(lsn);
self.to_clean.insert(segment_start);
}
if self.segments[idx].can_free() {
self.segments[idx].draining_to_free(lsn);
self.to_clean.remove(&segment_start);
trace!(
"freed segment {} in possibly_clean_or_free_segment",
segment_start
);
self.free_segment(segment_start, false);
}
}
pub(super) fn clean(&mut self, ignore_pid: PageId) -> Option<PageId> {
let seg_offset = if self.to_clean.is_empty() || self.to_clean.len() == 1
{
0
} else {
self.clean_counter % self.to_clean.len()
};
let item = self.to_clean.iter().nth(seg_offset).cloned();
if let Some(lid) = item {
let idx = self.lid_to_idx(lid);
let segment = &self.segments[idx];
assert!(segment.state == Draining || segment.state == Inactive);
if segment.present.is_empty() {
return None;
}
self.clean_counter += 1;
let offset = if segment.present.len() == 1 {
0
} else {
self.clean_counter % segment.present.len()
};
let pid = segment.present.iter().nth(offset).unwrap();
if *pid == ignore_pid {
return None;
}
trace!("telling caller to clean {} from segment at {}", pid, lid,);
return Some(*pid);
}
None
}
pub(super) fn mark_link(&mut self, pid: PageId, lsn: Lsn, ptr: DiskPtr) {
let _measure = Measure::new(&M.accountant_mark_link);
trace!("mark_link pid {} at ptr {}", pid, ptr);
let idx = self.lid_to_idx(ptr.lid());
let new_segment_start = idx as LogId * self.config.io_buf_size as LogId;
assert!(!self.to_clean.contains(&new_segment_start));
let segment = &mut self.segments[idx];
let segment_lsn = lsn / self.config.io_buf_size as Lsn
* self.config.io_buf_size as Lsn;
assert_eq!(
segment.lsn(),
segment_lsn,
"segment somehow got reused by the time a link was \
marked on it. expected lsn: {} actual: {}",
segment_lsn,
segment.lsn()
);
segment.insert_pid(pid, segment_lsn);
}
pub(super) fn deactivate_segment(&mut self, lsn: Lsn) -> Result<()> {
let lid = self.ordering[&lsn];
let idx = self.lid_to_idx(lid);
let replacements =
self.segments[idx].active_to_inactive(lsn, false, &self.config)?;
for &(pid, old_idx) in &replacements {
let old_segment = &mut self.segments[old_idx];
assert_ne!(
old_segment.state,
Active,
"segment {} is processing pid {} replacements for \
old segment {}, which is in the Active state. \
all replacements for pid: {:?}",
lid,
pid,
old_idx * self.config.io_buf_size,
replacements
.iter()
.filter(|(p, _)| p == &pid)
.collect::<Vec<_>>()
);
assert_ne!(
old_segment.state,
Free,
"segment {} is processing pid {} replacements for \
segment {}, which is in the Free state. \
all replacements for pid: {:?}",
lid,
pid,
old_idx * self.config.io_buf_size,
replacements
.iter()
.filter(|(p, _)| p == &pid)
.collect::<Vec<_>>()
);
debug_assert!(
old_segment.present.contains(&pid),
"we expect deferred replacements to provide \
all previous segments so we can clean them. \
pid {} old_ptr segment: {} segments with pid: {:?}",
pid,
old_idx * self.config.io_buf_size,
self.segments
.iter()
.enumerate()
.filter(|(_, s)| s.present.contains(&pid))
.map(|(i, s)| (
i * self.config.io_buf_size,
s.state,
s.present.clone(),
))
.collect::<Vec<_>>()
);
old_segment.remove_pid(pid, lsn, false);
}
self.possibly_clean_or_free_segment(idx, lsn);
let free_segs = self.segments.iter().filter(|s| s.is_free()).count();
let inactive_segs =
self.segments.iter().filter(|s| s.is_inactive()).count();
let free_ratio = (free_segs * 100) / (1 + free_segs + inactive_segs);
if free_ratio >= (self.config.segment_cleanup_threshold * 100.) as usize
&& inactive_segs > 5
{
let last_index =
self.segments.iter().rposition(|s| s.is_inactive()).unwrap();
let segment_start = last_index * self.config.io_buf_size;
self.to_clean.insert(segment_start as LogId);
}
Ok(())
}
fn bump_tip(&mut self) -> LogId {
let truncations = mem::replace(&mut self.async_truncations, Vec::new());
for truncation in truncations {
match truncation.wait() {
Ok(Ok(())) => {}
error => {
error!("failed to shrink file: {:?}", error);
}
}
}
let lid = self.tip;
self.tip += self.config.io_buf_size as LogId;
trace!("advancing file tip from {} to {}", lid, self.tip);
lid
}
fn ensure_safe_free_distance(&mut self, lid: LogId) {
if let Some(idx) = &self.safety_buffer.iter().position(|l| *l == lid) {
while self.free.len() <= *idx {
let new_lid = self.bump_tip();
assert!(
new_lid > lid,
"we freed segment {} while self.tip of {} \
was somehow below it",
lid,
new_lid,
);
debug!(
"pushing segment {} to free from ensure_safe_free_distance",
new_lid
);
self.free.insert(new_lid, true);
}
}
}
pub(super) fn next(&mut self, lsn: Lsn) -> Result<LogId> {
let _measure = Measure::new(&M.accountant_next);
assert_eq!(
lsn % self.config.io_buf_size as Lsn,
0,
"unaligned Lsn provided to next!"
);
loop {
if self.tip == 0 {
break;
}
let last_segment = self.tip - self.config.io_buf_size as LogId;
if self.free.get(&last_segment) == Some(&false)
&& !self.safety_buffer.contains(&last_segment)
{
self.free.remove(&last_segment);
self.truncate(last_segment)?;
} else {
break;
}
}
let safe = self
.free
.keys()
.filter(|l| !self.safety_buffer.contains(l))
.take(1)
.cloned()
.nth(0);
let lid = if self.pause_rewriting || safe.is_none() {
self.bump_tip()
} else {
let next = safe.unwrap();
self.free.remove(&next);
next
};
let idx = self.lid_to_idx(lid);
assert_eq!(self.segments[idx].state, Free);
if let Some(old_lsn) = self.segments[idx].lsn {
self.ordering.remove(&old_lsn);
}
self.segments[idx].free_to_active(lsn);
self.ordering.insert(lsn, lid);
debug!(
"segment accountant returning offset: {} \
paused: {} last: {:?} on deck: {:?}, \
safety_buffer: {:?}",
lid,
self.pause_rewriting,
self.safety_buffer.last(),
self.free,
self.safety_buffer
);
if lid == 0 {
let no_zeroes = !self.safety_buffer.contains(&0);
assert!(
self.safety_buffer.is_empty() || no_zeroes,
"SA returning 0, and we expected \
the safety buffer to either be empty, or contain no other \
zeroes, but it was {:?}",
self.safety_buffer
);
} else {
assert!(
!self.safety_buffer.contains(&lid),
"giving away segment {} that is in the safety buffer {:?}",
lid,
self.safety_buffer
);
}
self.safety_buffer.push(lid);
if self.safety_buffer.len() > self.config.io_bufs {
self.safety_buffer.remove(0);
}
assert!(self.safety_buffer.len() <= self.config.io_bufs);
Ok(lid)
}
pub(super) fn segment_snapshot_iter_from(
&mut self,
lsn: Lsn,
) -> Box<dyn Iterator<Item = (Lsn, LogId)>> {
if let Err(e) = self.ensure_ordering_initialized() {
error!("failed to load segment ordering: {:?}", e);
}
assert!(
self.pause_rewriting,
"must pause rewriting before \
iterating over segments"
);
let segment_len = self.config.io_buf_size as Lsn;
let normalized_lsn = lsn / segment_len * segment_len;
Box::new(
self.ordering
.clone()
.into_iter()
.filter(move |&(l, _)| l >= normalized_lsn),
)
}
fn truncate(&mut self, at: LogId) -> Result<()> {
assert_eq!(
at % self.config.io_buf_size as LogId,
0,
"new length must be io-buf-len aligned"
);
if self.safety_buffer.contains(&at) {
panic!(
"file tip {} to be truncated is in the safety buffer {:?}",
at, self.safety_buffer
);
}
self.tip = at;
assert!(
!self.segment_in_free(at),
"double-free of a segment occurred"
);
if let Some(ref thread_pool) = self.config.thread_pool {
trace!("asynchronously truncating file to length {}", at);
let (completer, oneshot) = oneshot();
let f = self.config.file.clone();
thread_pool.spawn(move || {
debug!("truncating file to length {}", at);
let res = f
.set_len(at)
.and_then(|_| f.sync_all())
.map_err(|e| e.into());
if let Err(e) = completer.send(res) {
error!("failed to fill async truncation future: {:?}", e);
}
});
self.async_truncations.push(oneshot);
Ok(())
} else {
trace!("synchronously truncating file to length {}", at);
let f = &self.config.file;
f.set_len(at)?;
f.sync_all()?;
Ok(())
}
}
fn ensure_ordering_initialized(&mut self) -> Result<()> {
if !self.ordering.is_empty() {
return Ok(());
}
self.ordering = scan_segment_lsns(0, &self.config)?;
debug!("initialized ordering to {:?}", self.ordering);
Ok(())
}
fn lid_to_idx(&mut self, lid: LogId) -> usize {
assert_ne!(lid, LogId::max_value());
let idx = lid as usize / self.config.io_buf_size;
if self.segments.len() < idx + 1 {
self.segments.resize(idx + 1, Segment::default());
}
idx
}
fn segment_in_free(&self, lid: LogId) -> bool {
self.free.contains_key(&lid)
}
}
fn scan_segment_lsns(
min: Lsn,
config: &Config,
) -> Result<BTreeMap<Lsn, LogId>> {
let mut ordering = BTreeMap::new();
let segment_len = config.io_buf_size as LogId;
let mut cursor = 0;
let f = &config.file;
while let Ok(segment) = f.read_segment_header(cursor) {
trace!(
"SA scanned header at lid {} during startup: {:?}",
cursor,
segment
);
if segment.ok && (segment.lsn != 0 || cursor == 0) && segment.lsn >= min
{
assert_ne!(segment.lsn, Lsn::max_value());
assert!(
!ordering.contains_key(&segment.lsn),
"duplicate segment LSN {} detected at both {} and {}, \
one should have been zeroed out during recovery",
segment.lsn,
ordering[&segment.lsn],
cursor
);
ordering.insert(segment.lsn, cursor);
}
cursor += segment_len;
}
debug!("ordering before clearing tears: {:?}", ordering);
clean_tail_tears(ordering, config, &f)
}
fn clean_tail_tears(
mut ordering: BTreeMap<Lsn, LogId>,
config: &Config,
f: &File,
) -> Result<BTreeMap<Lsn, LogId>> {
let safety_buffer = config.io_bufs;
let logical_tail: Vec<Lsn> = ordering
.iter()
.rev()
.take(safety_buffer)
.map(|(lsn, _lid)| *lsn)
.collect();
let io_buf_size = config.io_buf_size;
let mut tear_at = None;
for window in logical_tail.windows(2) {
if window[0] != window[1] + io_buf_size as Lsn {
error!("detected torn segment somewhere after {}", window[1]);
tear_at = Some(window[1]);
}
}
for (&lsn, &lid) in &ordering {
let trailer_lid = lid + io_buf_size as LogId - SEG_TRAILER_LEN as LogId;
let expected_trailer_lsn =
lsn + io_buf_size as Lsn - SEG_TRAILER_LEN as Lsn;
let trailer_res = f.read_segment_trailer(trailer_lid);
if trailer_res.is_err() {
debug!("could not read trailer of segment starting at {}", lid);
if let Some(existing_tear) = tear_at {
if existing_tear > lsn {
tear_at = Some(lsn);
}
} else {
tear_at = Some(lsn);
}
break;
}
let trailer = trailer_res.unwrap();
if !trailer.ok
|| trailer.lsn != expected_trailer_lsn
|| (lsn == 0 && lid != 0)
{
debug!(
"tear detected at expected trailer lsn {} header lsn {} \
lid {} for trailer {:?}",
expected_trailer_lsn, lsn, lid, trailer
);
if let Some(existing_tear) = tear_at {
if existing_tear > lsn {
tear_at = Some(lsn);
}
} else {
tear_at = Some(lsn);
}
}
}
if let Some(tear) = tear_at {
debug!("filtering out segments after detected tear at {}", tear);
let tears = ordering
.iter()
.filter(|&(lsn, _lid)| *lsn > tear)
.collect::<Vec<_>>();
if tears.len() > config.io_bufs {
error!(
"encountered corruption in the middle \
of the database file, before the \
section that would be impacted by \
a normal crash. tear at lsn {} \
segments after that: {:?}",
tear, tears,
);
return Err(Error::Corruption {
at: DiskPtr::Inline(ordering[&tear]),
});
}
for (&lsn, &lid) in &tears {
if lsn > tear {
error!("filtering out segment with lsn {} at lid {}", lsn, lid);
f.pwrite_all(&*vec![EVIL_BYTE; SEG_HEADER_LEN], lid).expect(
"should be able to mark a linear-orphan \
segment as invalid",
);
f.sync_all().expect(
"should be able to sync data \
file after purging linear-orphan",
);
}
}
ordering = ordering
.into_iter()
.filter(|&(lsn, _lid)| lsn <= tear)
.collect();
}
Ok(ordering)
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub enum SegmentMode {
Linear,
Gc,
}
pub(super) fn raw_segment_iter_from(
lsn: Lsn,
config: &Config,
) -> Result<LogIter> {
let segment_len = config.io_buf_size as Lsn;
let normalized_lsn = lsn / segment_len * segment_len;
let ordering = scan_segment_lsns(0, &config)?;
trace!(
"generated iterator over segments {:?} with lsn >= {}",
ordering,
normalized_lsn
);
let segment_iter = Box::new(
ordering
.into_iter()
.filter(move |&(l, _)| l >= normalized_lsn),
);
Ok(LogIter {
config: config.clone(),
max_lsn: std::i64::MAX,
cur_lsn: SEG_HEADER_LEN as Lsn,
segment_base: None,
segment_iter,
trailer: None,
})
}
fn segment_is_drainable(
idx: usize,
num_segments: usize,
live_pct: u8,
len: usize,
config: &Config,
) -> bool {
let base_cleanup_threshold =
(config.segment_cleanup_threshold * 100.) as usize;
let cleanup_skew = config.segment_cleanup_skew;
let relative_prop = if num_segments == 0 {
50
} else {
(idx * 100) / num_segments
};
let inverse_prop = 100 - relative_prop;
let relative_threshold = cleanup_skew * inverse_prop / 100;
let computed_threshold = base_cleanup_threshold + relative_threshold;
let cleanup_threshold = if computed_threshold == 0 {
1
} else if computed_threshold > 99 {
99
} else {
computed_threshold
};
let segment_low_pct = live_pct as usize <= cleanup_threshold;
let segment_low_count =
len < MINIMUM_ITEMS_PER_SEGMENT * 100 / cleanup_threshold;
segment_low_pct || segment_low_count
}