use std::{
collections::HashSet,
time::{Duration, Instant},
};
use crate::{
arena::ArenaHash,
backend::OnDiskObject,
db::{DB, Update},
};
#[derive(Debug, Default)]
struct IncrementalLinRegress {
sum_x: f64,
sum_y: f64,
}
impl IncrementalLinRegress {
fn measure(&mut self, x: f64, y: f64) {
self.sum_x += x;
self.sum_y += y;
}
fn predict(&self, x: f64) -> Option<f64> {
let a = self.sum_y / self.sum_x;
let y = a * x;
y.is_finite().then_some(y)
}
}
#[derive(Debug, Default)]
struct RunningBenchmark {
read_model: IncrementalLinRegress,
scan_model: IncrementalLinRegress,
}
const DEFAULT_BATCH_SIZE: usize = 128;
const BATCH_LIMIT: usize = 4096;
impl RunningBenchmark {
fn read_batch_size(&self, budget: Duration) -> Option<usize> {
if budget.is_zero() {
return None;
}
match self.read_model.predict(budget.as_micros() as f64) {
Some(batch) if batch > BATCH_LIMIT as f64 => Some(BATCH_LIMIT),
Some(batch) if batch > 0f64 => Some(batch.ceil() as usize),
None => Some(DEFAULT_BATCH_SIZE),
Some(_) => None,
}
}
fn read_batch_measurement(&mut self, batch_size: usize, took: Duration) {
self.read_model
.measure(took.as_micros() as f64, batch_size as f64);
}
fn scan_batch_size(&self, budget: Duration) -> Option<usize> {
if budget.is_zero() {
return None;
}
match self.scan_model.predict(budget.as_micros() as f64) {
Some(batch) if batch > BATCH_LIMIT as f64 => Some(BATCH_LIMIT),
Some(batch) if batch > 1f64 => Some(batch.ceil() as usize),
None => Some(DEFAULT_BATCH_SIZE),
Some(_) => None,
}
}
fn scan_batch_measurement(&mut self, batch_size: usize, took: Duration) {
self.scan_model
.measure(took.as_micros() as f64, batch_size as f64);
}
}
#[derive(Debug)]
pub(crate) struct GcState<D: DB> {
rescan: bool,
last_roots: HashSet<ArenaHash<D::Hasher>>,
grey_set: HashSet<ArenaHash<D::Hasher>>,
mark_set: HashSet<ArenaHash<D::Hasher>>,
sweep_resume: Option<D::ScanResumeHandle>,
running_bench: RunningBenchmark,
}
impl<D: DB> Default for GcState<D> {
fn default() -> Self {
GcState {
rescan: true,
last_roots: Default::default(),
grey_set: Default::default(),
mark_set: Default::default(),
sweep_resume: None,
running_bench: Default::default(),
}
}
}
impl<D: DB> GcState<D> {
pub(crate) fn force_rescan(&mut self) {
self.rescan = true;
}
pub(crate) fn run<'a, 'b: 'a>(
&'b mut self,
roots: impl Iterator<Item = ArenaHash<D::Hasher>>,
remaining_budget: impl Fn() -> Duration,
db: &'b mut D,
cache_read: impl Fn(ArenaHash<D::Hasher>) -> Option<&'a OnDiskObject<D::Hasher>>,
db_roots: impl for<'c> FnOnce(&'c mut D) -> Vec<ArenaHash<D::Hasher>>,
) -> Vec<ArenaHash<D::Hasher>> {
if self.rescan {
self.last_roots = roots.chain(db_roots(db)).collect();
self.grey_set.extend(
self.last_roots
.iter()
.filter(|r| !self.mark_set.contains(r))
.cloned(),
);
self.rescan = false;
} else {
for root in roots {
if self.last_roots.insert(root.clone()) {
self.grey_set.insert(root);
}
}
}
let mut to_process = self.grey_set.clone();
while !to_process.is_empty() {
let mut next = HashSet::new();
for hash in to_process.into_iter() {
if let Some(obj) = cache_read(hash.clone()) {
self.mark_set.insert(hash.clone());
self.grey_set.remove(&hash);
for child in obj.children.iter().flat_map(|c| c.refs().into_iter()) {
if !self.mark_set.contains(child) && !self.grey_set.contains(child) {
self.grey_set.insert(child.clone());
next.insert(child.clone());
}
}
}
}
to_process = next;
}
while !self.grey_set.is_empty()
&& let Some(batch_size) = self.running_bench.read_batch_size(remaining_budget())
{
let batch_start = Instant::now();
let batch: Vec<_> = self.grey_set.iter().take(batch_size).cloned().collect();
let batch_size = batch.len();
let batch_read = db.batch_get_nodes(batch.iter().cloned());
self.mark_set.extend(batch);
for (parent, obj) in batch_read {
self.grey_set.remove(&parent);
self.mark_set.insert(parent);
for child in obj
.iter()
.flat_map(|o| o.children.iter())
.flat_map(|c| c.refs().into_iter())
{
if !self.mark_set.contains(child) && !self.grey_set.contains(child) {
self.grey_set.insert(child.clone());
}
}
}
self.running_bench
.read_batch_measurement(batch_size, batch_start.elapsed());
}
if !self.grey_set.is_empty() {
return vec![];
}
let mut cull_set = vec![];
while let Some(batch_size) = self.running_bench.scan_batch_size(remaining_budget()) {
let batch_start = Instant::now();
let (nodes, handle) = db.scan(self.sweep_resume.clone(), batch_size);
self.sweep_resume = handle;
let batch_size = nodes.len();
let to_delete = nodes
.into_iter()
.map(|(k, _)| k)
.filter(|k| !self.mark_set.contains(k))
.collect::<Vec<_>>();
db.batch_update(to_delete.iter().map(|k| (k.clone(), Update::DeleteNode)));
cull_set.extend(to_delete);
self.running_bench
.scan_batch_measurement(batch_size, batch_start.elapsed());
if self.sweep_resume.is_none() {
break;
}
}
if self.sweep_resume.is_some() {
return cull_set;
}
self.rescan = true;
self.last_roots = HashSet::new();
self.mark_set = HashSet::new();
cull_set
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_gc() {
use crate::db::DB;
use std::time::Duration;
let arena = crate::arena::Arena::new_from_backend(crate::backend::StorageBackend::new(
1,
crate::db::InMemoryDB::<crate::DefaultHasher>::default(),
));
let size = || arena.with_backend(|b| b.database.size());
const CHUNK: usize = 10_000;
let mut refs = (0..10 * CHUNK)
.map(|i| arena.alloc(i as u64))
.collect::<Vec<_>>();
refs.iter_mut().for_each(|r| r.persist());
arena.with_backend(|b| b.flush_all_changes_to_db());
assert_eq!(size(), 10 * CHUNK);
assert_eq!(arena.with_backend(|b| b.gc(Duration::from_hours(1))), 0);
assert_eq!(size(), 10 * CHUNK);
refs[9 * CHUNK..10 * CHUNK]
.iter_mut()
.for_each(|r| r.unpersist());
arena.with_backend(|b| b.flush_all_changes_to_db());
assert_eq!(arena.with_backend(|b| b.gc(Duration::from_hours(1))), 0);
assert_eq!(size(), 10 * CHUNK);
refs.truncate(9 * CHUNK);
assert_eq!(arena.with_backend(|b| b.gc(Duration::from_hours(1))), CHUNK);
assert_eq!(size(), 9 * CHUNK);
refs.truncate(8 * CHUNK);
assert_eq!(arena.with_backend(|b| b.gc(Duration::from_hours(1))), 0);
assert_eq!(size(), 9 * CHUNK);
refs[5 * CHUNK..8 * CHUNK]
.iter_mut()
.for_each(|r| r.unpersist());
arena.with_backend(|b| b.flush_all_changes_to_db());
refs.truncate(5 * CHUNK);
assert_eq!(arena.with_backend(|b| b.gc_budgeted(|| Duration::ZERO)), 0);
assert_eq!(size(), 9 * CHUNK);
let batch_counter = std::cell::Cell::new(0usize);
let budget = || {
let n = batch_counter.get();
batch_counter.set(n + 1);
if n < 10 {
Duration::MAX
} else {
Duration::ZERO
}
};
let mut culled = 0;
for _ in 0..100 {
batch_counter.set(0);
culled += arena.with_backend(|b| b.gc_budgeted(budget));
}
assert_eq!(culled, 3 * CHUNK);
assert_eq!(size(), 6 * CHUNK);
}
}