use std::{collections::BTreeMap, mem};
use super::*;
#[derive(Debug)]
pub(super) struct SegmentAccountant {
config: Config,
segments: Vec<Segment>,
clean_counter: usize,
free: VecSet<LogId>,
tip: LogId,
max_stabilized_lsn: Lsn,
to_clean: VecSet<LogId>,
pause_rewriting: bool,
ordering: BTreeMap<Lsn, LogId>,
async_truncations: Vec<Promise<Result<()>>>,
deferred_free_segments: Option<Vec<LogId>>,
deferred_free_segments_after: Lsn,
}
#[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)]
struct Segment {
present: FastSet8<PageId>,
not_yet_replaced: FastSet8<PageId>,
removed: FastSet8<PageId>,
deferred_rm_blob: FastSet8<BlobPointer>,
deferred_replacements: FastSet8<(PageId, SegmentId)>,
lsn: Option<Lsn>,
state: SegmentState,
}
#[derive(
Debug,
Copy,
Eq,
Hash,
Ord,
PartialOrd,
PartialEq,
Clone,
Serialize,
Deserialize,
)]
pub(crate) enum SegmentState {
Free,
Active,
Inactive,
Draining,
}
use self::SegmentState::{Free, Active, Inactive, Draining};
impl Default for SegmentState {
fn default() -> Self {
Free
}
}
impl Segment {
fn len(&self) -> usize {
std::cmp::max(self.present.len(), self.removed.len())
- self.removed.len()
}
fn is_free(&self) -> bool {
self.state == Free
}
fn is_inactive(&self) -> bool {
match self.state {
Inactive => true,
_ => false,
}
}
fn _is_active(&self) -> bool {
match self.state {
Active => true,
_ => false,
}
}
fn is_draining(&self) -> bool {
match self.state {
Draining => true,
_ => false,
}
}
fn free_to_active(&mut self, new_lsn: Lsn) {
trace!(
"setting Segment to Active with new lsn {:?}, was {:?}",
new_lsn,
self.lsn
);
assert_eq!(self.state, Free);
self.present.clear();
self.not_yet_replaced.clear();
self.removed.clear();
self.deferred_rm_blob.clear();
self.deferred_replacements.clear();
self.lsn = Some(new_lsn);
self.state = Active;
}
fn active_to_inactive(
&mut self,
lsn: Lsn,
from_recovery: bool,
config: &Config,
) -> Result<FastSet8<(PageId, usize)>> {
trace!("setting Segment with lsn {:?} to Inactive", self.lsn());
assert!(
self.state == Active || self.state == Draining,
"segment {} should have been \
Active or Draining, before deactivating, but was {:?}",
self.lsn(),
self.state
);
if from_recovery {
assert!(lsn >= self.lsn());
} else {
assert_eq!(self.lsn.unwrap(), lsn);
}
self.state = Inactive;
let deferred_rm_blob =
mem::replace(&mut self.deferred_rm_blob, FastSet8::default());
for ptr in deferred_rm_blob {
trace!(
"removing blob {} while transitioning \
segment lsn {:?} to Inactive",
ptr,
self.lsn,
);
remove_blob(ptr, config)?;
}
let deferred_replacements =
mem::replace(&mut self.deferred_replacements, FastSet8::default());
Ok(deferred_replacements)
}
fn inactive_to_draining(&mut self, lsn: Lsn) {
trace!("setting Segment with lsn {:?} to Draining", self.lsn());
assert_eq!(
self.state, Inactive,
"segment with lsn {:?} should have been \
Inactive before draining",
self.lsn
);
assert!(lsn >= self.lsn());
self.state = Draining;
}
fn draining_to_free(&mut self, lsn: Lsn) {
trace!("setting Segment with lsn {:?} to Free", self.lsn());
assert!(self.is_draining());
assert!(lsn >= self.lsn());
self.present.clear();
self.not_yet_replaced.clear();
self.removed.clear();
self.state = Free;
}
fn recovery_ensure_initialized(&mut self, lsn: Lsn) {
if let Some(current_lsn) = self.lsn {
if current_lsn != lsn {
assert!(lsn > current_lsn);
trace!("(snapshot) recovering segment with base lsn {}", lsn);
self.state = Free;
self.free_to_active(lsn);
}
} else {
trace!("(snapshot) recovering segment with base lsn {}", lsn);
self.free_to_active(lsn);
}
}
fn lsn(&self) -> Lsn {
self.lsn.unwrap()
}
fn insert_pid(&mut self, pid: PageId, lsn: Lsn) {
assert_eq!(lsn, self.lsn.unwrap());
assert_eq!(
self.state, Active,
"expected segment with lsn {} to be Active",
lsn
);
assert!(!self.removed.contains(&pid));
self.not_yet_replaced.insert(pid);
self.present.insert(pid);
}
fn remove_pid(&mut self, pid: PageId, lsn: Lsn, in_recovery: bool) {
assert!(lsn >= self.lsn.unwrap());
match self.state {
Active => {
if !in_recovery {
panic!("remove_pid called on Active segment");
}
assert!(
!self.present.contains(&pid),
"did not expect present to contain pid {} during recovery",
pid,
);
self.removed.insert(pid);
}
Inactive | Draining => {
self.present.remove(&pid);
self.removed.insert(pid);
}
Free => panic!("remove_pid called on a Free Segment"),
}
}
fn defer_replace_pids(
&mut self,
deferred: FastSet8<(PageId, usize)>,
lsn: Lsn,
) {
assert!(lsn >= self.lsn.unwrap());
self.deferred_replacements.extend(deferred);
}
fn remove_blob(
&mut self,
blob_ptr: BlobPointer,
config: &Config,
) -> Result<()> {
match self.state {
Active => {
self.deferred_rm_blob.insert(blob_ptr);
}
Inactive | Draining => {
trace!(
"directly removing blob {} that was referred-to \
in a segment that has already been marked as Inactive \
or Draining.",
blob_ptr,
);
remove_blob(blob_ptr, config)?;
}
Free => panic!("remove_blob called on a Free Segment"),
}
Ok(())
}
fn live_pct(&self) -> u8 {
let total = self.present.len() + self.removed.len();
if total == 0 {
return 100;
}
let live = self.present.len() * 100 / total;
assert!(live <= 100);
live as u8
}
fn can_free(&self) -> bool {
self.state == Draining && self.is_empty()
}
fn is_empty(&self) -> bool {
self.present.is_empty()
}
}
impl SegmentAccountant {
pub(super) fn start(
config: Config,
snapshot: Snapshot,
) -> Result<Self> {
let mut ret = Self {
config,
segments: vec![],
clean_counter: 0,
free: VecSet::default(),
tip: 0,
max_stabilized_lsn: -1,
to_clean: VecSet::default(),
pause_rewriting: false,
ordering: BTreeMap::default(),
async_truncations: Vec::default(),
deferred_free_segments: None,
deferred_free_segments_after: 0,
};
if let SegmentMode::Linear = ret.config.segment_mode {
ret.pause_rewriting();
}
ret.initialize_from_snapshot(snapshot)?;
Ok(ret)
}
fn initialize_from_snapshot(&mut self, snapshot: Snapshot) -> Result<()> {
let io_buf_size = self.config.io_buf_size;
let file_len = self.config.file.metadata()?.len();
let empty_snapshot = snapshot.pt.is_empty();
let number_of_segments = usize::try_from(file_len / io_buf_size as u64)
.unwrap()
+ if empty_snapshot
|| file_len % u64::try_from(io_buf_size).unwrap()
< u64::try_from(SEG_HEADER_LEN).unwrap()
{
0
} else {
1
};
if empty_snapshot {
assert_eq!(number_of_segments, 0);
}
let mut segments = vec![Segment::default(); number_of_segments];
let mut segment_sizes = vec![0_usize; number_of_segments];
let mut add = |pid,
lsn,
sz,
lid: LogId,
segments: &mut Vec<Segment>| {
let idx = assert_usize(lid / io_buf_size as LogId);
trace!(
"adding lsn: {} lid: {} for pid {} to segment {} during SA recovery",
lsn,
lid,
pid,
idx
);
let segment_lsn = lsn / io_buf_size as Lsn * io_buf_size as Lsn;
segments[idx].recovery_ensure_initialized(segment_lsn);
segments[idx].insert_pid(pid, segment_lsn);
segment_sizes[idx] += sz;
};
for (pid, state) in snapshot.pt {
match state {
PageState::Present(coords) => {
for (lsn, ptr, sz) in coords {
add(pid, lsn, sz, ptr.lid(), &mut segments);
}
}
PageState::Free(lsn, ptr) => {
add(pid, lsn, MSG_HEADER_LEN, ptr.lid(), &mut segments);
}
}
}
let currently_active_segment = {
let prospective_currently_active_segment =
(snapshot.last_lid / io_buf_size as LogId) as usize;
if let Some(segment) =
segments.get(prospective_currently_active_segment)
{
if segment.is_empty() {
usize::max_value()
} else {
prospective_currently_active_segment
}
} else {
usize::max_value()
}
};
assert!(self.config.segment_cleanup_threshold < 100.);
let cleanup_threshold =
(self.config.segment_cleanup_threshold * 100.) as usize;
let drain_sz = io_buf_size * 100 / cleanup_threshold;
let mut deferred_free_segments = vec![];
for (idx, segment) in segments.iter_mut().enumerate() {
let segment_base = idx as LogId * io_buf_size as LogId;
if segment_base >= self.tip {
self.tip = segment_base + io_buf_size as LogId;
trace!(
"raised self.tip to {} during SA initialization",
self.tip
);
}
let segment_lsn = if let Some(lsn) = segment.lsn {
lsn
} else {
deferred_free_segments.push(segment_base);
continue;
};
if idx != currently_active_segment
&& segment_lsn + io_buf_size as Lsn
<= snapshot.max_header_stable_lsn
{
if segment_sizes[idx] == 0 {
trace!(
"freeing segment with lid {} during SA initialization",
segment_base
);
if self.tip == segment_base + io_buf_size as LogId {
self.tip -= io_buf_size as LogId;
} else {
segment.state = Free;
self.free_segment(segment_base, true);
}
trace!(
"zeroing segment with lid {} during SA initialization",
segment_base
);
maybe_fail!("segment initial free zero");
self.config.file.pwrite_all(
&*vec![MessageKind::Corrupted.into(); SEG_HEADER_LEN],
segment_base,
)?;
if !self.config.temporary {
self.config.file.sync_all()?;
}
} else if segment_sizes[idx] <= drain_sz {
trace!(
"SA draining segment at {} during startup \
with size {} being < drain size of {}",
segment_base,
segment_sizes[idx],
drain_sz
);
segment.state = Draining;
self.to_clean.insert(segment_base);
}
}
}
if !deferred_free_segments.is_empty() {
trace!(
"setting self.deferred_free_segments to {:?} to be \
freed after lsn {}",
deferred_free_segments,
snapshot.last_lsn,
);
self.deferred_free_segments = Some(deferred_free_segments);
self.deferred_free_segments_after = snapshot.last_lsn;
}
trace!("initialized self.segments to {:?}", segments);
self.segments = segments;
self.ordering = self
.segments
.iter()
.enumerate()
.filter_map(|(id, s)| {
if s.lsn.is_some() {
Some((s.lsn(), id as LogId * io_buf_size as LogId))
} else {
None
}
})
.collect();
trace!("initialized self.ordering to {:?}", self.ordering);
Ok(())
}
fn free_segment(&mut self, lid: LogId, in_recovery: bool) {
debug!("freeing segment {}", lid);
debug!("free list before free {:?}", self.free);
let idx = self.lid_to_idx(lid);
assert!(
self.tip > lid,
"freed a segment above our current file tip, \
please report this bug!"
);
assert_eq!(self.segments[idx].state, Free);
assert!(
!self.free.contains(&lid),
"double-free of a segment occurred"
);
if in_recovery {
if let Some(old_lsn) = self.segments[idx].lsn {
trace!(
"removing segment {} with lsn {} from ordering",
lid,
old_lsn
);
self.ordering.remove(&old_lsn);
}
}
self.free.insert(lid);
}
pub(super) fn pause_rewriting(&mut self) {
self.pause_rewriting = true;
}
pub(super) fn resume_rewriting(&mut self) {
if self.config.segment_mode != SegmentMode::Linear {
self.pause_rewriting = false;
}
}
pub(super) fn mark_replace(
&mut self,
pid: PageId,
lsn: Lsn,
old_ptrs: Vec<DiskPtr>,
new_ptr: DiskPtr,
) -> Result<()> {
let _measure = Measure::new(&M.accountant_mark_replace);
trace!(
"mark_replace pid {} from ptrs {:?} to ptr {} with lsn {}",
pid,
old_ptrs,
new_ptr,
lsn
);
let new_idx = self.lid_to_idx(new_ptr.lid());
let new_segment_start =
new_idx as LogId * self.config.io_buf_size as LogId;
assert!(!self.to_clean.contains(&new_segment_start));
let schedule_rm_blob = !(old_ptrs.len() == 1 && old_ptrs[0].is_blob());
let mut deferred_replacements = FastSet8::default();
for old_ptr in old_ptrs {
let old_lid = old_ptr.lid();
if schedule_rm_blob && old_ptr.is_blob() {
trace!(
"queueing blob removal for {} in our own segment",
old_ptr
);
self.segments[new_idx]
.remove_blob(old_ptr.blob().1, &self.config)?;
}
let old_idx = self.lid_to_idx(old_lid);
if new_idx == old_idx {
continue;
}
if self.segments[old_idx].lsn() > lsn {
panic!(
"mark_replace called on previous version of segment. \
this means it was reused while other threads still \
had references to it."
);
}
if self.segments[old_idx].state == Free {
panic!(
"mark_replace called on Free segment with lid {}. \
this means it was dropped while other threads still had \
references to it.",
old_idx * self.config.io_buf_size
);
}
self.segments[old_idx].not_yet_replaced.remove(&pid);
deferred_replacements.insert((pid, old_idx));
}
self.segments[new_idx].defer_replace_pids(deferred_replacements, lsn);
self.mark_link(pid, lsn, new_ptr);
Ok(())
}
fn possibly_clean_or_free_segment(&mut self, idx: usize, lsn: Lsn) {
let can_drain = segment_is_drainable(
idx,
self.segments.len(),
self.segments[idx].live_pct(),
self.segments[idx].len(),
&self.config,
) && self.segments[idx].is_inactive();
let segment_start = (idx * self.config.io_buf_size) as LogId;
if can_drain {
trace!(
"SA inserting {} into to_clean from possibly_clean_or_free_segment",
segment_start
);
self.segments[idx].inactive_to_draining(lsn);
self.to_clean.insert(segment_start);
}
if self.segments[idx].can_free() {
self.segments[idx].draining_to_free(lsn);
self.to_clean.remove(&segment_start);
trace!(
"freed segment {} in possibly_clean_or_free_segment",
segment_start
);
self.free_segment(segment_start, false);
}
}
pub(super) fn clean(&mut self, ignore_pid: PageId) -> Option<PageId> {
let seg_offset = if self.to_clean.is_empty() || self.to_clean.len() == 1
{
0
} else {
self.clean_counter % self.to_clean.len()
};
let item = self.to_clean.get(seg_offset).cloned();
if let Some(lid) = item {
let idx = self.lid_to_idx(lid);
let segment = &self.segments[idx];
assert!(segment.state == Draining || segment.state == Inactive);
let present = &segment.not_yet_replaced;
if present.is_empty() {
return None;
}
self.clean_counter += 1;
let offset = if present.len() == 1 {
0
} else {
self.clean_counter % present.len()
};
let pid = present.iter().nth(offset).unwrap();
if *pid == ignore_pid {
return None;
}
trace!("telling caller to clean {} from segment at {}", pid, lid,);
return Some(*pid);
}
None
}
pub(super) fn mark_link(&mut self, pid: PageId, lsn: Lsn, ptr: DiskPtr) {
let _measure = Measure::new(&M.accountant_mark_link);
trace!("mark_link pid {} at ptr {}", pid, ptr);
let idx = self.lid_to_idx(ptr.lid());
let new_segment_start = idx as LogId * self.config.io_buf_size as LogId;
assert!(!self.to_clean.contains(&new_segment_start));
let segment = &mut self.segments[idx];
let segment_lsn = lsn / self.config.io_buf_size as Lsn
* self.config.io_buf_size as Lsn;
assert_eq!(
segment.lsn(),
segment_lsn,
"segment somehow got reused by the time a link was \
marked on it. expected lsn: {} actual: {}",
segment_lsn,
segment.lsn()
);
segment.insert_pid(pid, segment_lsn);
}
pub(super) fn stabilize(&mut self, stable_lsn: Lsn) -> Result<()> {
let io_buf_size = self.config.io_buf_size as Lsn;
let lsn = ((stable_lsn / io_buf_size) - 1) * io_buf_size;
trace!(
"stabilize({}), normalized: {}, last: {}",
stable_lsn,
lsn,
self.max_stabilized_lsn
);
if self.max_stabilized_lsn >= lsn {
trace!(
"expected stabilization lsn {} \
to be greater than the previous value of {}",
lsn,
self.max_stabilized_lsn
);
return Ok(());
}
if self.deferred_free_segments.is_some()
&& lsn > self.deferred_free_segments_after
{
let deferred_free_segments =
self.deferred_free_segments.take().unwrap();
for segment_base in deferred_free_segments {
let idx = self.lid_to_idx(segment_base);
self.segments[idx].state = Free;
self.free_segment(segment_base, false);
}
}
let bounds = (
std::ops::Bound::Excluded(self.max_stabilized_lsn),
std::ops::Bound::Included(lsn),
);
let can_deactivate = self
.ordering
.range(bounds)
.map(|(lsn, _lid)| *lsn)
.collect::<Vec<_>>();
self.max_stabilized_lsn = lsn;
for lsn in can_deactivate {
self.deactivate_segment(lsn)?;
}
Ok(())
}
fn deactivate_segment(&mut self, lsn: Lsn) -> Result<()> {
let lid = self.ordering[&lsn];
let idx = self.lid_to_idx(lid);
trace!(
"deactivating segment with lsn {}: {:?}",
lsn,
self.segments[idx]
);
let replacements = if self.segments[idx].state == Active {
self.segments[idx].active_to_inactive(lsn, false, &self.config)?
} else {
Default::default()
};
let mut old_segments = FastSet8::default();
for &(pid, old_idx) in &replacements {
old_segments.insert(old_idx);
let old_segment = &mut self.segments[old_idx];
assert!(
old_segment.state != Active && old_segment.state != Free,
"segment {} is processing pid {} replacements for \
old segment {}, which is in the {:?} state. \
all replacements for pid: {:?}",
lid,
pid,
old_idx * self.config.io_buf_size,
old_segment.state,
replacements
.iter()
.filter(|(p, _)| p == &pid)
.collect::<Vec<_>>()
);
#[cfg(feature = "event_log")]
assert!(
old_segment.present.contains(&pid),
"we expect deferred replacements to provide \
all previous segments so we can clean them. \
pid {} old_ptr segment: {} segments with pid: {:?}",
pid,
old_idx * self.config.io_buf_size,
self.segments
.iter()
.enumerate()
.filter_map(|(i, s)| {
if s.present.contains(&pid) {
Some((
i * self.config.io_buf_size,
s.state,
s.present.clone(),
))
} else {
None
}
})
.collect::<Vec<_>>()
);
old_segment.remove_pid(pid, lsn, false);
}
for old_idx in old_segments {
self.possibly_clean_or_free_segment(old_idx, lsn);
}
let free_segs = self.segments.iter().filter(|s| s.is_free()).count();
let inactive_segs =
self.segments.iter().filter(|s| s.is_inactive()).count();
let free_ratio = (free_segs * 100) / (1 + free_segs + inactive_segs);
if free_ratio >= (self.config.segment_cleanup_threshold * 100.) as usize
&& inactive_segs > 5
{
let last_index =
self.segments.iter().rposition(Segment::is_inactive).unwrap();
let segment_start = (last_index * self.config.io_buf_size) as LogId;
self.to_clean.insert(segment_start);
}
Ok(())
}
fn bump_tip(&mut self) -> LogId {
let truncations = mem::replace(&mut self.async_truncations, Vec::new());
for truncation in truncations {
match truncation.wait() {
Some(Ok(())) => {}
error => {
error!("failed to shrink file: {:?}", error);
}
}
}
let lid = self.tip;
self.tip += self.config.io_buf_size as LogId;
trace!("advancing file tip from {} to {}", lid, self.tip);
lid
}
pub(super) fn next(&mut self, lsn: Lsn) -> Result<LogId> {
let _measure = Measure::new(&M.accountant_next);
assert_eq!(
lsn % self.config.io_buf_size as Lsn,
0,
"unaligned Lsn provided to next!"
);
let free: Vec<LogId> = self
.free
.iter()
.filter(|lid| {
let idx =
usize::try_from(*lid / self.config.io_buf_size as LogId)
.unwrap();
if let Some(last_lsn) = self.segments[idx].lsn {
last_lsn < self.max_stabilized_lsn
} else {
true
}
})
.copied()
.collect();
trace!("evaluating free list {:?} in SA::next", free);
while self.tip != 0 && self.free.len() > 1 {
let last_segment = self.tip - self.config.io_buf_size as LogId;
if free.contains(&last_segment) {
self.free.remove(&last_segment);
self.truncate(last_segment)?;
} else {
break;
}
}
let safe = free.first();
let lid = match (self.pause_rewriting, safe) {
(true, _) | (_, None) => self.bump_tip(),
(_, Some(&next)) => {
self.free.remove(&next);
next
},
};
let idx = self.lid_to_idx(lid);
assert_eq!(self.segments[idx].state, Free);
if let Some(old_lsn) = self.segments[idx].lsn {
self.ordering.remove(&old_lsn);
}
self.segments[idx].free_to_active(lsn);
self.ordering.insert(lsn, lid);
let lid_slack = self
.deferred_free_segments
.as_ref()
.map_or(0, |dfs| dfs.len() * self.config.io_buf_size);
debug!(
"segment accountant returning offset: {} \
paused: {} on deck: {:?} lid_slack: {}",
lid, self.pause_rewriting, self.free, lid_slack
);
assert!(lsn + lid_slack as Lsn >= lid as Lsn);
Ok(lid)
}
pub(super) fn segment_snapshot_iter_from(
&mut self,
lsn: Lsn,
) -> Box<dyn Iterator<Item = (Lsn, LogId)>> {
assert!(
!self.ordering.is_empty(),
"expected ordering to have been initialized already"
);
assert!(
self.pause_rewriting,
"must pause rewriting before \
iterating over segments"
);
let segment_len = self.config.io_buf_size as Lsn;
let normalized_lsn = lsn / segment_len * segment_len;
trace!(
"generated iterator over {:?} where lsn >= {}",
self.ordering,
normalized_lsn
);
Box::new(
self.ordering
.clone()
.into_iter()
.filter(move |&(l, _)| l >= normalized_lsn),
)
}
fn truncate(&mut self, at: LogId) -> Result<()> {
assert_eq!(
at % self.config.io_buf_size as LogId,
0,
"new length must be io-buf-len aligned"
);
self.tip = at;
assert!(
!self.free.contains(&at),
"double-free of a segment occurred"
);
trace!("asynchronously truncating file to length {}", at);
let (completer, promise) = Promise::pair();
let config = self.config.clone();
let _result = threadpool::spawn(move || {
debug!("truncating file to length {}", at);
let res = config
.file
.set_len(at)
.and_then(|_| config.file.sync_all())
.map_err(|e| e.into());
completer.fill(res);
});
#[cfg(any(test, feature = "check_snapshot_integrity"))]
_result.unwrap();
self.async_truncations.push(promise);
Ok(())
}
fn lid_to_idx(&mut self, lid: LogId) -> usize {
let idx = assert_usize(lid / self.config.io_buf_size as LogId);
if self.segments.len() < idx + 1 {
self.segments.resize(idx + 1, Segment::default());
}
idx
}
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub enum SegmentMode {
Linear,
Gc,
}
fn segment_is_drainable(
idx: usize,
num_segments: usize,
live_pct: u8,
len: usize,
config: &Config,
) -> bool {
let base_cleanup_threshold =
(config.segment_cleanup_threshold * 100.) as usize;
let cleanup_skew = config.segment_cleanup_skew;
let relative_prop = if num_segments == 0 {
50
} else {
(idx * 100) / num_segments
};
let inverse_prop = 100 - relative_prop;
let relative_threshold = cleanup_skew * inverse_prop / 100;
let computed_threshold = base_cleanup_threshold + relative_threshold;
let cleanup_threshold = if computed_threshold == 0 {
1
} else if computed_threshold > 99 {
99
} else {
computed_threshold
};
let segment_low_pct = live_pct as usize <= cleanup_threshold;
let segment_low_count =
len < MINIMUM_ITEMS_PER_SEGMENT * 100 / cleanup_threshold;
segment_low_pct || segment_low_count
}