use {
crate::{
ContentHash,
Ident,
database::{
GenerationEpoch,
partitions::{CollectCascadingRefs, RefcountOps},
storage::Partitions,
},
},
parking_lot::Mutex,
std::{
collections::VecDeque,
sync::Arc,
},
};
#[derive(Debug, Clone)]
pub struct DeferredDecrement {
pub partition: Ident,
pub hash: ContentHash,
pub from_epoch: GenerationEpoch,
}
pub struct Reaper<P: Partitions> {
queue: Mutex<VecDeque<DeferredDecrement>>,
stores: Arc<P::Stores>,
}
impl<P: Partitions> Reaper<P>
where
P::Stores: RefcountOps,
{
pub fn new(stores: Arc<P::Stores>) -> Self {
Self {
queue: Mutex::new(VecDeque::new()),
stores,
}
}
pub fn queue_decrement(&self, partition: Ident, hash: ContentHash, epoch: GenerationEpoch) {
let mut queue = self.queue.lock();
queue.push_back(DeferredDecrement {
partition,
hash,
from_epoch: epoch,
});
}
pub fn queue_decrements(&self, decrements: impl IntoIterator<Item = DeferredDecrement>) {
let mut queue = self.queue.lock();
queue.extend(decrements);
}
pub fn queue_len(&self) -> usize {
self.queue.lock().len()
}
pub fn is_empty(&self) -> bool {
self.queue.lock().is_empty()
}
pub fn stores(&self) -> &P::Stores {
&self.stores
}
}
impl<P: Partitions> Reaper<P>
where
P::Stores: RefcountOps + CollectCascadingRefs<P>,
{
pub fn reap(&self, oldest_running_epoch: GenerationEpoch, budget: usize) -> usize {
let mut removed_count = 0;
let mut processed = 0;
loop {
if processed >= budget {
break;
}
let dd = {
let mut queue = self.queue.lock();
match queue.front() {
Some(dd) if dd.from_epoch < oldest_running_epoch => {
queue.pop_front()
}
_ => None,
}
};
let Some(dd) = dd else {
break;
};
processed += 1;
if self.do_decrement(dd) {
removed_count += 1;
}
}
removed_count
}
fn do_decrement(&self, dd: DeferredDecrement) -> bool {
let new_count = self.stores.decrement_refcount(dd.partition, dd.hash);
if new_count == 0 {
let cascaded_refs = self.stores.collect_cascading_refs(dd.partition, dd.hash);
if !cascaded_refs.is_empty() {
let mut queue = self.queue.lock();
for (child_partition, child_hash) in cascaded_refs {
queue.push_back(DeferredDecrement {
partition: child_partition,
hash: child_hash,
from_epoch: dd.from_epoch,
});
}
}
self.stores.remove_record(dd.partition, dd.hash);
true
} else {
false
}
}
}
impl<P: Partitions> Reaper<P> {
pub fn queue_len_inner(&self) -> usize {
self.queue.lock().len()
}
pub fn is_empty_inner(&self) -> bool {
self.queue.lock().is_empty()
}
}
impl<P: Partitions> std::fmt::Debug for Reaper<P> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Reaper")
.field("queue_len", &self.queue_len_inner())
.finish_non_exhaustive()
}
}
pub struct RefCountIncrementer<'a, P: Partitions>
where
P::Stores: RefcountOps,
{
stores: &'a P::Stores,
}
impl<'a, P: Partitions> RefCountIncrementer<'a, P>
where
P::Stores: RefcountOps,
{
pub fn new(stores: &'a P::Stores) -> Self {
Self { stores }
}
}
impl<'a, P: Partitions> crate::record::References<P> for RefCountIncrementer<'a, P>
where
P::Stores: RefcountOps,
{
fn add<Part: crate::database::Partition>(&mut self, handle: crate::database::RecordHandle<Part>)
where
P::Stores: crate::database::HasPartition<Part>,
{
self
.stores
.increment_refcount(Part::KEY, handle.content_hash());
}
}
pub struct RefCountDecrementer<'a> {
decrements: &'a mut Vec<DeferredDecrement>,
epoch: GenerationEpoch,
}
impl<'a> RefCountDecrementer<'a> {
pub fn new(decrements: &'a mut Vec<DeferredDecrement>, epoch: GenerationEpoch) -> Self {
Self { decrements, epoch }
}
}
impl<P: Partitions> crate::record::References<P> for RefCountDecrementer<'_> {
fn add<Part: crate::database::Partition>(&mut self, handle: crate::database::RecordHandle<Part>)
where
P::Stores: crate::database::HasPartition<Part>,
{
self.decrements.push(DeferredDecrement {
partition: Part::KEY,
hash: handle.content_hash(),
from_epoch: self.epoch,
});
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn deferred_decrement_stores_fields() {
let dd = DeferredDecrement {
partition: Ident::new("test"),
hash: ContentHash::new(&[1, 2, 3]),
from_epoch: GenerationEpoch::new(42),
};
assert_eq!(dd.partition, Ident::new("test"));
assert_eq!(dd.from_epoch, GenerationEpoch::new(42));
}
}