mod rcmap;
pub use rcmap::{ChildRef, RcMap};
use serialize::Serializable;
use crate::arena::ArenaKey;
use crate::db::DB;
use base_crypto::cost_model::{CostDuration, RunningCost};
use std::collections::BTreeSet;
pub struct WriteDeleteResults<D: DB> {
pub bytes_written: u64,
pub bytes_deleted: u64,
pub nodes_written: u64,
pub nodes_deleted: u64,
pub processing_cost: RunningCost,
pub updated_charged_keys: RcMap<D>,
}
pub fn initial_write_delete_costs<D: DB>(
r0: &BTreeSet<ArenaKey<D::Hasher>>,
cpu_cost: impl Fn(u64, u64) -> RunningCost,
) -> WriteDeleteResults<D> {
let rcmap = RcMap::default();
let keys_reachable_from_r0 = get_writes(&rcmap, r0);
let keys_removed = BTreeSet::new();
let k0 = update_rcmap(&rcmap, &keys_reachable_from_r0);
WriteDeleteResults::new(keys_reachable_from_r0, keys_removed, k0, cpu_cost)
}
pub fn incremental_write_delete_costs<D: DB>(
k0: &RcMap<D>,
r1: &BTreeSet<ArenaKey<D::Hasher>>,
cpu_cost: impl Fn(u64, u64) -> RunningCost,
gc_limit: impl FnOnce(RunningCost) -> usize,
) -> WriteDeleteResults<D> {
let keys_added = get_writes(k0, r1);
let added_cost = cpu_cost(keys_added.len() as u64, 0);
let k = update_rcmap(k0, &keys_added);
let (k1, keys_removed) = gc_rcmap(&k, r1, gc_limit(added_cost));
WriteDeleteResults::new(keys_added, keys_removed, k1, cpu_cost)
}
fn compute_bytes_from_keys<D: DB>(keys: &BTreeSet<ArenaKey<D::Hasher>>) -> u64 {
let arena = &crate::storage::default_storage::<D>().arena;
arena.with_backend(|backend| {
keys.iter()
.map(|key| {
match key {
ArenaKey::Ref(key) => {
backend
.get(key)
.expect("key should exist in arena when computing bytes")
.size() as u64
+ 32 + 4
}
ArenaKey::Direct(_) => key.serialized_size() as u64 * 2,
}
})
.sum()
})
}
impl<D: DB> WriteDeleteResults<D> {
fn new(
keys_added: BTreeSet<ArenaKey<D::Hasher>>,
keys_removed: BTreeSet<ArenaKey<D::Hasher>>,
new_charged_keys: RcMap<D>,
cpu_cost: impl Fn(u64, u64) -> RunningCost,
) -> Self {
let nodes_written = keys_added.len() as u64;
let nodes_deleted = keys_removed.len() as u64;
Self {
bytes_written: compute_bytes_from_keys::<D>(&keys_added),
bytes_deleted: compute_bytes_from_keys::<D>(&keys_removed),
nodes_written,
nodes_deleted,
processing_cost: cpu_cost(nodes_written, nodes_deleted),
updated_charged_keys: new_charged_keys,
}
}
pub fn running_cost(&self) -> RunningCost {
RunningCost {
read_time: CostDuration::ZERO,
compute_time: CostDuration::ZERO,
bytes_written: self.bytes_written,
bytes_deleted: self.bytes_deleted,
} + self.processing_cost
}
}
pub fn get_writes<D: DB>(
rcmap: &RcMap<D>,
roots: &BTreeSet<ArenaKey<D::Hasher>>,
) -> BTreeSet<ArenaKey<D::Hasher>> {
let arena = &crate::storage::default_storage::<D>().arena;
let mut queue: Vec<ArenaKey<D::Hasher>> = roots.iter().cloned().collect();
queue.sort();
let mut keys_added = BTreeSet::new();
while let Some(key) = queue.pop() {
if !rcmap.contains(&key) && !keys_added.contains(&key) {
match &key {
ArenaKey::Ref(key) => {
let children = arena
.children(key)
.expect("children for write update should be loadable");
queue.extend(
children
.iter()
.flat_map(ArenaKey::refs)
.map(|r| ArenaKey::Ref(r.clone())),
);
}
ArenaKey::Direct(_) => {
queue.extend(key.refs().into_iter().map(|r| ArenaKey::Ref(r.clone())))
}
}
keys_added.insert(key);
}
}
keys_added
}
#[must_use]
pub fn update_rcmap<D: DB>(
rcmap: &RcMap<D>,
keys_added: &BTreeSet<ArenaKey<D::Hasher>>,
) -> RcMap<D> {
let arena = &crate::storage::default_storage::<D>().arena;
let mut rcmap = rcmap.clone();
let mut inc_map = keys_added
.iter()
.map(|k| (k.clone(), 0))
.collect::<std::collections::BTreeMap<_, _>>();
for key in keys_added {
match key {
ArenaKey::Ref(key) => {
let children = arena.children(key).expect("children should be loadable");
for child in children
.iter()
.flat_map(ArenaKey::refs)
.map(|r| ArenaKey::Ref(r.clone()))
{
*inc_map.entry(child).or_default() += 1;
}
}
ArenaKey::Direct(_) => {
for child in key.refs().into_iter().map(|r| ArenaKey::Ref(r.clone())) {
*inc_map.entry(child).or_default() += 1;
}
}
}
}
let mut inc_vec = inc_map.into_iter().collect::<Vec<_>>();
inc_vec.sort();
for (k, by) in inc_vec.into_iter() {
match &k {
ArenaKey::Ref(r) => {
let old_rc = rcmap.get_rc(&k).unwrap_or(0);
rcmap = rcmap.modify_rc(r, old_rc + by);
}
ArenaKey::Direct(_) => rcmap = rcmap.ins_root(k),
}
}
rcmap
}
#[must_use]
pub fn gc_rcmap<D: DB>(
orig_rcmap: &RcMap<D>,
roots: &BTreeSet<ArenaKey<D::Hasher>>,
step_limit: usize,
) -> (RcMap<D>, BTreeSet<ArenaKey<D::Hasher>>) {
let arena = &crate::storage::default_storage::<D>().arena;
let mut rcmap = orig_rcmap.clone();
let mut keys_removed = BTreeSet::new();
let mut step = 0;
let mut storage_queue = orig_rcmap.get_unreachable_keys_not_in(roots);
let mut queue: Vec<ArenaKey<D::Hasher>> = Vec::new();
let mut rc_cache = std::collections::BTreeMap::new();
let mut update_queue = std::collections::BTreeMap::new();
while let Some(key) = storage_queue.next().or_else(|| queue.pop()) {
if step >= step_limit {
break;
}
step += 1;
let children_refs: Box<dyn Iterator<Item = _>> = match &key {
ArenaKey::Ref(key) => Box::new(
arena
.children(key)
.expect("children should be loadable")
.into_iter()
.flat_map(|c| {
c.refs()
.into_iter()
.cloned()
.collect::<Vec<_>>()
.into_iter()
}),
),
ArenaKey::Direct(_) => Box::new(key.refs().into_iter().cloned()),
};
for child in children_refs.map(|r| ArenaKey::Ref(r.clone())) {
let existing = rc_cache
.entry(child.clone())
.or_insert_with(|| rcmap.get_rc(&child).unwrap_or(0));
let sub = update_queue.entry(child.clone()).or_default();
*sub += 1;
if *sub >= *existing && !roots.contains(&child) {
queue.push(child.clone());
}
}
keys_removed.insert(key);
}
let mut update_vec = update_queue.into_iter().collect::<Vec<_>>();
update_vec.sort();
for (key, update) in update_vec.into_iter() {
match &key {
ArenaKey::Ref(r) => {
let original = rc_cache
.get(&key)
.expect("must have cached decremented key");
let updated = original.saturating_sub(update);
rcmap = rcmap.modify_rc(r, updated);
}
ArenaKey::Direct(_) => rcmap = rcmap.rm_root(&key),
}
}
let mut removed_vec = keys_removed.iter().collect::<Vec<_>>();
removed_vec.sort();
for key in removed_vec.into_iter() {
rcmap = rcmap
.remove_unreachable_key(key)
.expect("keys in queue have rc == 0");
}
(rcmap, keys_removed)
}
#[cfg(test)]
mod tests {
use super::*;
use crate as storage;
use crate::arena::Sp;
use crate::db::DB;
use crate::storable::{Loader, SMALL_OBJECT_LIMIT};
use crate::storage::set_default_storage;
use crate::{DefaultDB, Storable};
use derive_where::derive_where;
use serialize::Tagged;
use std::collections::BTreeMap;
#[derive(Storable, Debug, Hash)]
#[derive_where(Clone, PartialEq, Eq)]
#[storable(db = D)]
#[tag = "test_node[v1]"]
struct Node<D: DB = DefaultDB> {
id: u64, children: Vec<Sp<Node<D>, D>>,
_data: [u8; SMALL_OBJECT_LIMIT], }
impl<D: DB> Node<D> {
fn new(id: (u8, u8), children: &[Sp<Node<D>, D>]) -> Sp<Node<D>, D> {
let encoded_id = (id.0 as u64) * 256 + (id.1 as u64);
Sp::new(Node {
id: encoded_id,
children: children.to_vec(),
_data: [0; SMALL_OBJECT_LIMIT],
})
}
}
struct Dag<D: DB = DefaultDB> {
nodes: BTreeMap<(u8, u8), ArenaKey<D::Hasher>>,
_roots: Vec<Sp<Node<D>, D>>,
}
#[allow(clippy::type_complexity)]
fn test_dag_adjacency() -> Vec<((u8, u8), Vec<(u8, u8)>)> {
vec![
((0, 1), vec![(1, 1), (1, 2), (2, 1), (3, 1)]),
((0, 2), vec![(1, 2), (1, 3)]),
((1, 1), vec![(2, 1), (2, 2)]),
((1, 2), vec![(2, 2), (3, 1)]),
((1, 3), vec![(2, 3), (3, 2)]),
((2, 1), vec![(3, 1), (3, 2), (3, 3)]),
((2, 2), vec![(3, 2), (4, 1)]),
((2, 3), vec![(3, 3), (3, 4)]),
((3, 1), vec![(4, 1), (4, 2)]),
((3, 2), vec![(4, 1), (4, 2), (4, 3)]),
((3, 3), vec![]),
((3, 4), vec![(4, 3), (5, 1)]),
((4, 1), vec![(5, 1)]),
((4, 2), vec![(5, 1), (5, 2)]),
((4, 3), vec![(5, 2)]),
((5, 1), vec![]),
((5, 2), vec![]),
]
}
fn build_test_dag<D: DB>() -> Dag<D> {
let adjacency = test_dag_adjacency();
let mut nodes: BTreeMap<(u8, u8), Sp<Node<D>, D>> = BTreeMap::new();
for ((layer, id), children_ids) in adjacency.iter().rev() {
let node_id = (*layer, *id);
let children: Vec<_> = children_ids
.iter()
.map(|child_id| nodes[child_id].clone())
.collect();
nodes.insert(node_id, Node::new(node_id, &children));
}
let mut arena_nodes = BTreeMap::new();
for ((layer, id), node) in &nodes {
arena_nodes.insert((*layer, *id), node.child_repr.clone());
}
Dag {
nodes: arena_nodes,
_roots: vec![nodes[&(0, 1)].clone(), nodes[&(0, 2)].clone()],
}
}
fn compute_reachable_nodes(roots: &[(u8, u8)]) -> BTreeSet<(u8, u8)> {
let adjacency = test_dag_adjacency();
let mut reachable = BTreeSet::new();
let mut queue: Vec<(u8, u8)> = roots.to_vec();
while let Some(node_id) = queue.pop() {
if !reachable.insert(node_id) {
continue;
}
let children = adjacency
.iter()
.find(|(k, _)| *k == node_id)
.expect("nodes must be in adjacency")
.1
.clone();
queue.extend(children);
}
reachable
}
fn get_subgraph_rcs(roots: &[(u8, u8)]) -> BTreeMap<(u8, u8), u64> {
let adjacency = test_dag_adjacency();
let reachable = compute_reachable_nodes(roots);
let mut rcs = BTreeMap::new();
for node_id in &reachable {
rcs.insert(*node_id, 0);
}
for (parent_id, children) in &adjacency {
if reachable.contains(parent_id) {
for child_id in children {
let rc = rcs.get_mut(child_id).unwrap();
*rc += 1;
}
}
}
rcs
}
fn to_keys<'a, I>(node_ids: I) -> BTreeSet<ArenaKey<crate::DefaultHasher>>
where
I: IntoIterator<Item = &'a (u8, u8)>,
{
let dag = build_test_dag::<DefaultDB>();
node_ids
.into_iter()
.map(|id| dag.nodes[id].clone())
.collect()
}
use super::rcmap::tests::get_rcmap_descendants;
#[test]
fn get_writes() {
let _dag = build_test_dag::<DefaultDB>();
let k0: RcMap<DefaultDB> = RcMap::default();
let roots = [(0, 1)];
let r1 = to_keys(roots.iter());
let writes = super::get_writes(&k0, &r1.clone().into_iter().collect());
let expected_reachable = compute_reachable_nodes(&roots);
let expected_keys = to_keys(expected_reachable.iter());
assert_eq!(
writes,
expected_keys.into_iter().collect(),
"Write set should contain exactly the reachable nodes"
);
let k0_node_ids: Vec<_> = test_dag_adjacency()
.iter()
.map(|((layer, id), _)| (*layer, *id))
.filter(|(layer, _)| *layer >= 3 && *layer <= 5)
.collect();
let k0_keys = to_keys(k0_node_ids.iter());
let k0_writes =
super::get_writes::<DefaultDB>(&RcMap::default(), &k0_keys.into_iter().collect());
let k0: RcMap<DefaultDB> = super::update_rcmap(&RcMap::default(), &k0_writes);
let writes = super::get_writes(&k0, &r1.into_iter().collect());
let reachable_from_r1 = compute_reachable_nodes(&roots);
let k0_set: BTreeSet<_> = k0_node_ids.iter().copied().collect();
let expected_writes = &reachable_from_r1 - &k0_set;
let expected_writes_keys = to_keys(expected_writes.iter());
assert_eq!(
writes,
expected_writes_keys.into_iter().collect(),
"Write set should exclude K0 nodes"
);
let multi_roots = [(0, 1), (0, 2)];
let r1_multi = to_keys(multi_roots.iter());
let writes_multi =
super::get_writes::<DefaultDB>(&RcMap::default(), &r1_multi.into_iter().collect());
let expected_multi = compute_reachable_nodes(&multi_roots);
let expected_multi_keys = to_keys(expected_multi.iter());
assert_eq!(
writes_multi,
expected_multi_keys.into_iter().collect(),
"Multiple roots should give union of reachable sets"
);
}
#[test]
fn update_rcmap() {
let dag = build_test_dag::<DefaultDB>();
let roots = [(0, 1)];
let reachable = compute_reachable_nodes(&roots);
let k0: RcMap<DefaultDB> = RcMap::default();
let writes = to_keys(reachable.iter());
let k1 = super::update_rcmap(&k0, &writes.into_iter().collect());
let expected_rcs = get_subgraph_rcs(&roots);
for (node_id, expected_rc) in expected_rcs {
let actual_rc = k1.get_rc(&dag.nodes[&node_id].clone()).unwrap();
assert_eq!(
actual_rc, expected_rc,
"Node {:?} should have rc={}, got {}",
node_id, expected_rc, actual_rc
);
}
}
#[test]
fn gc_rcmap() {
let dag = build_test_dag::<DefaultDB>();
let full_roots = [(0, 1), (0, 2)];
let full_reachable = compute_reachable_nodes(&full_roots);
let all_writes = to_keys(full_reachable.iter());
let k0: RcMap<DefaultDB> =
super::update_rcmap(&RcMap::default(), &all_writes.into_iter().collect());
let limited_roots = [(0, 1)];
let roots = to_keys(limited_roots.iter());
let step_limit = 1000;
let (k1, removed) = super::gc_rcmap(&k0, &roots.clone().into_iter().collect(), step_limit);
let kept_nodes = compute_reachable_nodes(&limited_roots);
let expected_removed: BTreeSet<_> = &full_reachable - &kept_nodes;
assert_eq!(
removed.len(),
expected_removed.len(),
"Should remove exactly the unreachable nodes"
);
for node_id in &expected_removed {
assert!(
removed.contains(&dag.nodes[node_id].clone()),
"Node {:?} should be removed as unreachable",
node_id
);
assert_eq!(
k1.get_rc(&dag.nodes[node_id]),
None,
"Removed node {:?} should not have rc in new map",
node_id
);
}
for node_id in &kept_nodes {
assert!(
!removed.contains(&dag.nodes[node_id].clone()),
"Node {:?} should not be removed as it's reachable",
node_id
);
assert!(
k1.get_rc(&dag.nodes[node_id].clone()).is_some(),
"Remaining node {:?} should have rc in new map",
node_id
);
}
let (k2, removed2) = super::gc_rcmap(&k0, &roots.clone().into_iter().collect(), 2);
assert!(
removed2.len() == 2,
"With step_limit=2, should remove 2 nodes"
);
let (_k3, removed3) =
super::gc_rcmap(&k2, &roots.into_iter().collect(), expected_removed.len());
let total_removed: BTreeSet<_> = removed2.union(&removed3).cloned().collect();
assert!(
total_removed.len() == expected_removed.len(),
"Resuming GC should make progress"
);
let empty_roots = BTreeSet::new();
let mut current_rcmap = k0.clone();
let mut total_single_step_removed = BTreeSet::new();
loop {
let (new_rcmap, removed_single) = super::gc_rcmap(¤t_rcmap, &empty_roots, 1);
if removed_single.is_empty() {
break; }
total_single_step_removed.extend(removed_single);
current_rcmap = new_rcmap;
}
assert_eq!(
total_single_step_removed.len(),
full_reachable.len(),
"Single-step GC should eventually remove all nodes with empty root set"
);
}
#[test]
fn rcmap_survives_gc_with_only_references() {
use crate::db::InMemoryDB;
use crate::storage::WrappedDB;
use std::collections::BTreeSet;
struct Tag;
type W = WrappedDB<InMemoryDB, Tag>;
set_default_storage(crate::Storage::<W>::default).unwrap();
let rcmap: RcMap<W> = {
let dag = build_test_dag::<W>();
let full_roots = [(0, 1), (0, 2)];
let all_reachable = compute_reachable_nodes(&full_roots);
let all_writes: BTreeSet<_> = all_reachable
.iter()
.map(|id| dag.nodes[id].clone())
.collect();
super::update_rcmap(&RcMap::default(), &all_writes.into_iter().collect())
};
let empty_roots = BTreeSet::new();
let (_final_rcmap, _removed) = super::gc_rcmap(&rcmap, &empty_roots, 1000);
}
#[test]
fn write_delete_costs() {
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use std::collections::{BTreeMap, BTreeSet};
let dag = build_test_dag::<DefaultDB>();
let all_node_ids: Vec<(u8, u8)> = dag.nodes.keys().cloned().collect();
let mut rng = StdRng::seed_from_u64(42);
let mut root_sets = Vec::new();
for _ in 0..100 {
let root_set_size = {
let p = rng.gen_range(0..100);
if p < 40 {
rng.gen_range(0..=3)
} else if p < 80 {
rng.gen_range(4..=8)
} else if p < 95 {
rng.gen_range(9..=15)
} else {
rng.gen_range(16..=25.min(all_node_ids.len()))
}
};
let mut selected_nodes = BTreeSet::new();
while selected_nodes.len() < root_set_size {
let idx = rng.gen_range(0..all_node_ids.len());
selected_nodes.insert(all_node_ids[idx]);
}
root_sets.push(selected_nodes.into_iter().collect::<Vec<_>>());
}
let root_sets_as_keys: Vec<BTreeSet<_>> = root_sets.iter().map(to_keys).collect();
for i in 0..root_sets.len() {
let results = super::initial_write_delete_costs::<DefaultDB>(
&root_sets_as_keys[i].clone().into_iter().collect(),
|_, _| Default::default(),
);
let expected_rcs = get_subgraph_rcs(&root_sets[i]);
let actual_rcs = results.updated_charged_keys.get_rcs();
let expected_rcs_as_keys: BTreeMap<_, _> = expected_rcs
.into_iter()
.map(|(node_id, rc)| (dag.nodes[&node_id].clone(), rc))
.collect();
assert_eq!(
actual_rcs, expected_rcs_as_keys,
"Initial costs for root set {} should have correct reference counts",
i
);
let rcmap_descendants = get_rcmap_descendants(&results.updated_charged_keys);
for root_key in &root_sets_as_keys[i] {
assert!(
rcmap_descendants.contains(root_key),
"Root key {:?} should be a descendant of RcMap after initial_write_delete_costs",
root_key
);
}
}
let initial_roots = &root_sets_as_keys[0];
let initial_results =
super::initial_write_delete_costs(initial_roots, |_, _| Default::default());
let mut current_charged_keys = initial_results.updated_charged_keys;
for i in 1..root_sets.len() {
let next_roots = &root_sets_as_keys[i];
let results = super::incremental_write_delete_costs::<DefaultDB>(
¤t_charged_keys,
next_roots,
|_, _| Default::default(),
|_| 1000, );
let expected_rcs = get_subgraph_rcs(&root_sets[i]);
let actual_rcs = results.updated_charged_keys.get_rcs();
let expected_rcs_as_keys: BTreeMap<_, _> = expected_rcs
.into_iter()
.map(|(node_id, rc)| (dag.nodes[&node_id].clone(), rc))
.collect();
assert_eq!(
actual_rcs, expected_rcs_as_keys,
"Incremental transition {} should have correct reference counts",
i
);
let rcmap_descendants = get_rcmap_descendants(&results.updated_charged_keys);
for root_key in next_roots {
assert!(
rcmap_descendants.contains(root_key),
"Root key {:?} should be a descendant of RcMap after incremental_write_delete_costs",
root_key
);
}
current_charged_keys = results.updated_charged_keys;
}
}
}