use {
crate::{
ContentHash,
database::{
ContentHashRef,
GenerationEpoch,
HasPartition,
Partition,
Partitions,
handle::RecordHandle,
},
record::{
CollectReferences,
Record,
References,
},
},
dashmap::DashSet,
parking_lot::Mutex,
std::{
collections::VecDeque,
marker::PhantomData,
sync::atomic::{
AtomicU64,
AtomicU8,
Ordering,
},
time::Duration,
},
};
const GC_PHASE_IDLE: u8 = 0;
const GC_PHASE_MARKING: u8 = 1;
const GC_PHASE_SWEEPING: u8 = 2;
pub struct GarbageCollector {
phase: AtomicU8,
gc_epoch: AtomicU64,
gray: Mutex<VecDeque<ContentHashRef>>,
black: DashSet<ContentHash>,
}
#[derive(Debug, Clone)]
pub struct GcPolicy {
pub record_threshold: Option<usize>,
pub commit_threshold: Option<usize>,
pub idle_timeout: Option<Duration>,
pub periodic_interval: Duration,
pub mark_budget: usize,
pub reap_budget: usize,
}
impl Default for GcPolicy {
fn default() -> Self {
Self {
record_threshold: Some(100_000),
commit_threshold: Some(1_000),
idle_timeout: Some(Duration::from_secs(5)),
periodic_interval: Duration::from_secs(60),
mark_budget: 1000,
reap_budget: 10_000,
}
}
}
impl GarbageCollector {
pub fn new() -> Self {
Self {
phase: AtomicU8::new(GC_PHASE_IDLE),
gc_epoch: AtomicU64::new(0),
gray: Mutex::new(VecDeque::new()),
black: DashSet::new(),
}
}
pub fn start_marking(
&self,
roots: impl Iterator<Item = ContentHashRef>,
) -> bool {
if self.phase.compare_exchange(
GC_PHASE_IDLE,
GC_PHASE_MARKING,
Ordering::SeqCst,
Ordering::SeqCst,
).is_err() {
return false;
}
self.gc_epoch.fetch_add(1, Ordering::SeqCst);
self.black.clear();
let mut gray = self.gray.lock();
gray.clear();
gray.extend(roots);
true
}
pub fn mark_tick_partition<P, Part>(
&self,
stores: &P::Stores,
budget: usize,
) -> bool
where
P: Partitions,
Part: Partition + 'static,
Part::Record: Record + CollectReferences<P>,
P::Stores: HasPartition<Part>,
{
if self.phase.load(Ordering::SeqCst) != GC_PHASE_MARKING {
return true;
}
let part_store = <P::Stores as HasPartition<Part>>::store(stores);
let mut processed = 0;
let mut new_gray = Vec::new();
while processed < budget {
let item = {
let mut gray = self.gray.lock();
let pos = gray.iter().position(|r| r.partition == Part::KEY);
pos.and_then(|i| gray.remove(i))
};
let Some(item) = item else {
break;
};
if self.black.contains(&item.hash) {
continue;
}
self.black.insert(item.hash);
processed += 1;
if let Some(record_ref) = part_store.get(&item.hash)
&& let Some(record) = record_ref.record()
{
let mut collector = GcRefCollector::<P>::new();
record.collect_references(&mut collector);
for child_ref in collector.refs {
if !self.black.contains(&child_ref.hash) {
new_gray.push(child_ref);
}
}
}
}
if !new_gray.is_empty() {
let mut gray = self.gray.lock();
gray.extend(new_gray);
}
self.gray.lock().is_empty()
}
pub fn finish_marking(&self) -> bool {
let gray = self.gray.lock();
if gray.is_empty() {
self.phase.store(GC_PHASE_SWEEPING, Ordering::SeqCst);
true
} else {
false
}
}
pub fn sweep_partition<P, Part>(&self, stores: &P::Stores)
where
P: Partitions,
Part: Partition + 'static,
P::Stores: HasPartition<Part>,
{
if self.phase.load(Ordering::SeqCst) != GC_PHASE_SWEEPING {
return;
}
let gc_epoch = GenerationEpoch::new(self.gc_epoch.load(Ordering::SeqCst));
let black = &self.black;
let part_store = <P::Stores as HasPartition<Part>>::store(stores);
part_store.retain(|hash, record_epoch| {
if record_epoch >= gc_epoch {
true
} else {
black.contains(&hash)
}
});
}
pub fn finish_sweep(&self) {
self.phase.store(GC_PHASE_IDLE, Ordering::SeqCst);
self.black.clear();
}
pub fn is_marking(&self) -> bool {
self.phase.load(Ordering::SeqCst) == GC_PHASE_MARKING
}
pub fn is_active(&self) -> bool {
self.phase.load(Ordering::SeqCst) != GC_PHASE_IDLE
}
pub fn is_black(&self, hash: &ContentHash) -> bool {
self.black.contains(hash)
}
pub fn add_to_gray(&self, refs: impl Iterator<Item = ContentHashRef>) {
if self.phase.load(Ordering::SeqCst) != GC_PHASE_MARKING {
return;
}
let mut gray = self.gray.lock();
for r in refs {
if !self.black.contains(&r.hash) {
gray.push_back(r);
}
}
}
pub fn epoch(&self) -> GenerationEpoch {
GenerationEpoch::new(self.gc_epoch.load(Ordering::SeqCst))
}
pub fn phase(&self) -> GcPhase {
match self.phase.load(Ordering::SeqCst) {
| GC_PHASE_IDLE => GcPhase::Idle,
| GC_PHASE_MARKING => GcPhase::Marking,
| GC_PHASE_SWEEPING => GcPhase::Sweeping,
| _ => GcPhase::Idle,
}
}
pub fn gray_queue_len(&self) -> usize {
self.gray.lock().len()
}
pub fn black_set_len(&self) -> usize {
self.black.len()
}
}
impl Default for GarbageCollector {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GcPhase {
Idle,
Marking,
Sweeping,
}
struct GcRefCollector<P: Partitions> {
refs: Vec<ContentHashRef>,
_marker: PhantomData<P>,
}
impl<P: Partitions> GcRefCollector<P> {
fn new() -> Self {
Self {
refs: Vec::new(),
_marker: PhantomData,
}
}
}
impl<P: Partitions> References<P> for GcRefCollector<P> {
fn add<Part: Partition>(
&mut self,
handle: RecordHandle<Part>,
) where
P::Stores: HasPartition<Part>,
{
self.refs.push(ContentHashRef::new(Part::KEY, handle.content_hash()));
}
}