#![allow(rustdoc::private_intra_doc_links)]
use crate::{
WellBehavedHasher,
arena::{ArenaHash, ArenaKey},
cache::Cache,
db::{DB, Update},
};
use rand::distributions::{Distribution, Standard};
use serialize::{Deserializable, Serializable};
use std::{
cell::RefCell,
collections::{HashMap, HashSet},
};
#[derive(PartialEq, Debug, Clone, Copy)]
struct Delta {
ref_delta: i32,
root_delta: i32,
}
impl Delta {
fn new_ref_delta(ref_delta: i32) -> Self {
assert!(ref_delta != 0, "ref delta must be non-zero");
Self {
ref_delta,
root_delta: 0,
}
}
fn new_root_delta(root_delta: i32) -> Self {
assert!(root_delta != 0, "root delta must be non-zero");
Self {
ref_delta: 0,
root_delta,
}
}
fn combine(self, other: Self) -> Option<Self> {
let ref_delta = self.ref_delta + other.ref_delta;
let root_delta = self.root_delta + other.root_delta;
if ref_delta == 0 && root_delta == 0 {
None
} else {
Some(Self {
ref_delta,
root_delta,
})
}
}
}
#[derive(PartialEq, Debug, Clone)]
enum CacheValue<H: WellBehavedHasher> {
Read { obj: OnDiskObject<H> },
Update { delta: Delta, obj: OnDiskObject<H> },
ReadAndUpdate { delta: Delta, obj: OnDiskObject<H> },
Create { obj: OnDiskObject<H> },
CreateAndUpdate { obj: OnDiskObject<H>, delta: Delta },
CreateAndDelete { obj: OnDiskObject<H>, delta: Delta },
Dummy,
}
impl<H: WellBehavedHasher> CacheValue<H> {
fn get_obj(&self) -> &OnDiskObject<H> {
match self {
CacheValue::Read { obj }
| CacheValue::Update { obj, .. }
| CacheValue::ReadAndUpdate { obj, .. }
| CacheValue::Create { obj, .. }
| CacheValue::CreateAndUpdate { obj, .. }
| CacheValue::CreateAndDelete { obj, .. } => obj,
CacheValue::Dummy => unreachable!(),
}
}
fn is_pending(&self) -> bool {
match self {
CacheValue::Read { .. } | CacheValue::Dummy => false,
CacheValue::Update { .. }
| CacheValue::ReadAndUpdate { .. }
| CacheValue::Create { .. }
| CacheValue::CreateAndUpdate { .. }
| CacheValue::CreateAndDelete { .. } => true,
}
}
}
#[derive(Debug)]
pub struct StorageBackend<D: DB> {
database: D,
cache_size: usize,
read_cache: Cache<ArenaHash<D::Hasher>, CacheValue<D::Hasher>>,
write_cache: Cache<ArenaHash<D::Hasher>, CacheValue<D::Hasher>>,
live_inserts: HashSet<ArenaHash<D::Hasher>>,
stats: RefCell<StorageBackendStats>,
}
#[derive(Debug, Clone, Copy)]
pub struct StorageBackendStats {
pub get_cache_hits: usize,
pub get_cache_misses: usize,
}
impl<D: DB> StorageBackend<D> {
pub(crate) fn new(cache_size: usize, database: D) -> Self {
let read_cache = if cache_size > 0 {
Cache::new(cache_size)
} else {
Cache::unbounded()
};
Self {
database,
cache_size,
read_cache,
write_cache: Cache::unbounded(),
live_inserts: HashSet::new(),
stats: RefCell::new(StorageBackendStats {
get_cache_hits: 0,
get_cache_misses: 0,
}),
}
}
pub fn get(&mut self, key: &ArenaHash<D::Hasher>) -> Option<&OnDiskObject<D::Hasher>> {
if self.peek_from_memory(key).is_some() {
self.stats.borrow_mut().get_cache_hits += 1;
let value = self.remove_from_memory(key);
let value = match value {
CacheValue::Update {
delta,
obj,
} => CacheValue::ReadAndUpdate {
delta,
obj,
},
CacheValue::Read { .. }
| CacheValue::ReadAndUpdate { .. }
| CacheValue::Create { .. }
| CacheValue::CreateAndUpdate { .. }
| CacheValue::CreateAndDelete { .. } | CacheValue::Dummy => value,
};
self.cache_insert_new_key(key.clone(), value);
return Some(self.peek_from_memory(key).unwrap().get_obj());
}
self.stats.borrow_mut().get_cache_misses += 1;
if let Some(obj) = self.database.get_node(key) {
self.cache_insert_new_key(key.clone(), CacheValue::Read { obj });
}
self.peek_from_memory(key).map(|cv| cv.get_obj())
}
pub(crate) fn get_root_count(&self, key: &ArenaHash<D::Hasher>) -> u32 {
let db_root_count = self.database.get_root_count(key);
let mem_root_delta = match self.peek_from_memory(key) {
Some(CacheValue::Read { .. }) => 0,
Some(CacheValue::Update { delta, .. }) => delta.root_delta,
Some(CacheValue::ReadAndUpdate { delta, .. }) => delta.root_delta,
Some(CacheValue::Create { .. }) => 0,
Some(CacheValue::CreateAndUpdate { delta, .. }) => delta.root_delta,
Some(CacheValue::CreateAndDelete { delta, .. }) => delta.root_delta,
None | Some(CacheValue::Dummy) => 0,
};
let root_count = db_root_count as i32 + mem_root_delta;
assert!(root_count >= 0, "root count must be non-negative");
root_count as u32
}
pub fn get_stats(&self) -> StorageBackendStats {
*self.stats.borrow()
}
pub fn get_roots(&self) -> HashMap<ArenaHash<D::Hasher>, u32> {
let mut roots_map = self.database.get_roots();
for (key, _) in self.write_cache.iter() {
let root_count = self.get_root_count(key);
if root_count > 0 {
roots_map.insert(key.clone(), root_count);
} else {
roots_map.remove(key);
}
}
roots_map
}
pub(crate) fn cache(
&mut self,
key: ArenaHash<D::Hasher>,
data: std::vec::Vec<u8>,
children: std::vec::Vec<ArenaKey<D::Hasher>>,
) {
assert!(
!self.live_inserts.contains(&key),
"a key can't be cached more than once without being uncached"
);
self.live_inserts.insert(key.clone());
if self.peek_from_memory(&key).is_some() {
let value = self.remove_from_memory(&key);
let value = match value {
CacheValue::Read { .. }
| CacheValue::ReadAndUpdate { .. }
| CacheValue::Create { .. }
| CacheValue::CreateAndUpdate { .. }
| CacheValue::Dummy => value,
CacheValue::Update { delta, obj } => CacheValue::ReadAndUpdate { delta, obj },
CacheValue::CreateAndDelete { obj, delta } => {
CacheValue::CreateAndUpdate { obj, delta }
}
};
self.cache_insert_new_key(key, value);
return;
}
if let Some(obj) = self.database.get_node(&key) {
self.cache_insert_new_key(key.clone(), CacheValue::Read { obj });
return;
}
self.update_counts(&children, Delta::new_ref_delta(1));
self.cache_insert_new_key(
key,
CacheValue::Create {
obj: OnDiskObject {
data,
ref_count: 0,
children,
},
},
);
}
pub(crate) fn uncache(&mut self, key: &ArenaHash<D::Hasher>) {
assert!(
self.live_inserts.contains(key),
"a key can't be uncached more times than it was cached (0 or 1)"
);
self.live_inserts.remove(key);
if let Some(value) = self.peek_from_memory(key).cloned() {
match value {
CacheValue::Create { obj } => {
assert_eq!(
obj.ref_count, 0,
"CacheValue::Create values must have zero ref counts"
);
self.remove_from_memory(key);
self.update_counts(&obj.children, Delta::new_ref_delta(-1))
}
CacheValue::CreateAndUpdate { obj, delta } => self
.write_cache
.update_in_place(key.clone(), CacheValue::CreateAndDelete { obj, delta }),
CacheValue::CreateAndDelete { .. } => (),
CacheValue::Read { .. } => (),
CacheValue::Update { .. } => (),
CacheValue::ReadAndUpdate { .. } => (),
CacheValue::Dummy => (),
};
}
}
pub fn unpersist(&mut self, key: &ArenaHash<D::Hasher>) {
self.update_counts(&[ArenaKey::Ref(key.clone())], Delta::new_root_delta(-1));
}
pub fn persist(&mut self, key: &ArenaHash<D::Hasher>) {
self.update_counts(&[ArenaKey::Ref(key.clone())], Delta::new_root_delta(1));
}
pub fn pre_fetch(
&mut self,
key: &ArenaHash<D::Hasher>,
max_depth: Option<usize>,
truncate: bool,
) {
let max_count = if self.cache_size > 0 {
Some(self.cache_size)
} else {
None
};
let mut kvs = self.database.bfs_get_nodes(
key,
|key| self.peek_from_memory(key).map(|cv| cv.get_obj().clone()),
truncate,
max_depth,
max_count,
);
kvs.reverse();
for (k, v) in kvs {
self.cache_insert_new_key(k, CacheValue::Read { obj: v })
}
}
fn flush_to_db<I>(&mut self, writes: I)
where
I: Iterator<Item = (ArenaHash<D::Hasher>, CacheValue<D::Hasher>)>,
{
let mut updates = vec![];
for (k, v) in writes {
if matches!(v, CacheValue::Read { .. }) {
panic!("BUG: unexpected CacheValue::Read!")
}
self.cache_insert_new_key(
k.clone(),
CacheValue::Read {
obj: v.get_obj().clone(),
},
);
match v {
CacheValue::Read { .. } => unreachable!("already handled Read above"),
CacheValue::Update { delta, obj } | CacheValue::ReadAndUpdate { delta, obj } => {
if delta.ref_delta != 0 {
updates.push((k.clone(), Update::InsertNode(obj)));
}
if delta.root_delta != 0 {
let db_root_count = self.database.get_root_count(&k) as i32;
let root_count = db_root_count + delta.root_delta;
assert!(root_count >= 0, "roots counts can't be negative!");
updates.push((k, Update::SetRootCount(root_count as u32)));
}
}
CacheValue::CreateAndUpdate { obj, delta }
| CacheValue::CreateAndDelete { obj, delta } => {
updates.push((k.clone(), Update::InsertNode(obj)));
if delta.root_delta != 0 {
assert!(delta.root_delta > 0, "root count can't be negative");
let root_count = delta.root_delta as u32;
updates.push((k, Update::SetRootCount(root_count)));
}
}
CacheValue::Create { obj } => updates.push((k, Update::InsertNode(obj))),
CacheValue::Dummy => {}
}
}
self.database.batch_update(updates.into_iter());
}
pub fn flush_cache_evictions_to_db(&mut self) {
if self.cache_size == 0 {
return;
}
let mut evictions = HashMap::new();
while self.write_cache.len() > self.cache_size {
let (k, v) = self.write_cache.pop_lru().unwrap();
evictions.insert(k, v);
}
self.flush_to_db(evictions.into_iter());
}
pub fn flush_all_changes_to_db(&mut self) {
let iter = self
.write_cache
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<std::vec::Vec<_>>()
.into_iter();
self.write_cache.clear();
self.flush_to_db(iter);
}
pub fn gc(&mut self) {
let db_unreachable_keys = self.database.get_unreachable_keys();
let mut mem_unreachable_keys = vec![];
for (k, v) in self.write_cache.iter() {
if v.get_obj().ref_count == 0 {
mem_unreachable_keys.push(k.clone());
}
}
let root_keys: HashSet<_> = self
.get_roots()
.into_keys()
.chain(self.live_inserts.clone())
.collect();
let mut unreachable_keys: std::vec::Vec<_> = db_unreachable_keys
.into_iter()
.chain(mem_unreachable_keys)
.filter(|k| !root_keys.contains(k))
.collect();
let mut keys_to_delete = vec![];
while let Some(key) = unreachable_keys.pop() {
let max_depth = 1;
let truncate = false;
self.pre_fetch(&key, Some(max_depth), truncate);
let node = self.peek_from_memory(&key).unwrap().get_obj().clone();
self.update_counts(&node.children, Delta::new_ref_delta(-1));
for child_key in node.children.iter().flat_map(ArenaKey::refs) {
if self.get(child_key).unwrap().ref_count == 0 && !root_keys.contains(child_key) {
unreachable_keys.push(child_key.clone());
}
}
keys_to_delete.push(key);
}
for key in &keys_to_delete {
self.write_cache.remove(key);
self.read_cache.remove(key);
}
let batch_deletes = keys_to_delete.into_iter().map(|k| (k, Update::DeleteNode));
self.database.batch_update(batch_deletes);
}
fn peek_from_memory(&self, key: &ArenaHash<D::Hasher>) -> Option<&CacheValue<D::Hasher>> {
if let Some(value) = self.write_cache.peek(key) {
return Some(value);
}
if let Some(value) = self.read_cache.peek(key) {
return Some(value);
}
None
}
fn peek_mut_from_memory(
&mut self,
key: &ArenaHash<D::Hasher>,
) -> Option<&mut CacheValue<D::Hasher>> {
if let Some(value) = self.write_cache.peek_mut(key) {
return Some(value);
}
if let Some(value) = self.read_cache.peek_mut(key) {
return Some(value);
}
None
}
fn remove_from_memory(&mut self, key: &ArenaHash<D::Hasher>) -> CacheValue<D::Hasher> {
self.write_cache
.remove(key)
.or_else(|| self.read_cache.remove(key))
.unwrap_or_else(|| panic!("key must be in memory"))
.1
}
fn update_counts(&mut self, children: &[ArenaKey<D::Hasher>], delta: Delta) {
for key in children.iter().flat_map(ArenaKey::refs) {
if let Some(cache_val) = self.peek_mut_from_memory(key) {
let mut tmp = CacheValue::Dummy;
std::mem::swap(&mut tmp, cache_val);
let was_pending = tmp.is_pending();
enum Action<H: WellBehavedHasher> {
Replace(CacheValue<H>),
Remove,
RemoveWithChildren(OnDiskObject<H>),
}
use Action::*;
let action = match tmp {
CacheValue::Read { obj } => {
let obj = obj.apply_delta(delta);
Replace(CacheValue::ReadAndUpdate { obj, delta })
}
CacheValue::Update {
delta: old_delta,
obj,
} => {
let obj = obj.apply_delta(delta);
#[allow(clippy::manual_map)]
if let Some(delta) = delta.combine(old_delta) {
Replace(CacheValue::Update { delta, obj })
} else {
Remove
}
}
CacheValue::ReadAndUpdate {
obj,
delta: old_delta,
} => {
let obj = obj.apply_delta(delta);
if let Some(delta) = delta.combine(old_delta) {
Replace(CacheValue::ReadAndUpdate { obj, delta })
} else {
Replace(CacheValue::Read { obj })
}
}
CacheValue::Create { obj } => {
let obj = obj.apply_delta(delta);
Replace(CacheValue::CreateAndUpdate { obj, delta })
}
CacheValue::CreateAndUpdate {
obj,
delta: old_delta,
} => {
let obj = obj.apply_delta(delta);
if let Some(delta) = delta.combine(old_delta) {
Replace(CacheValue::CreateAndUpdate { obj, delta })
} else {
Replace(CacheValue::Create { obj })
}
}
CacheValue::CreateAndDelete {
obj,
delta: old_delta,
} => {
let obj = obj.apply_delta(delta);
if let Some(delta) = delta.combine(old_delta) {
Replace(CacheValue::CreateAndDelete { obj, delta })
} else {
RemoveWithChildren(obj)
}
}
CacheValue::Dummy => Remove,
};
match action {
Replace(obj) => {
*cache_val = obj;
if was_pending != cache_val.is_pending() {
let value = self.remove_from_memory(key);
self.cache_insert_new_key(key.clone(), value);
} else {
self.promote(key);
}
}
Remove => {
self.remove_from_memory(key);
}
RemoveWithChildren(obj) => {
self.remove_from_memory(key);
self.update_counts(&obj.children, Delta::new_ref_delta(-1));
}
}
} else {
let obj = self
.database
.get_node(key)
.expect("can't update unknown object");
let obj = obj.apply_delta(delta);
self.cache_insert_new_key(key.clone(), CacheValue::Update { delta, obj })
}
}
}
fn promote(&mut self, key: &ArenaHash<D::Hasher>) {
let _ = self.write_cache.promote(key) || self.read_cache.promote(key);
}
fn cache_insert_new_key(&mut self, key: ArenaHash<D::Hasher>, value: CacheValue<D::Hasher>) {
debug_assert!(
self.peek_from_memory(&key).is_none(),
"key must not already be in memory"
);
if value.is_pending() {
assert!(
self.write_cache.set(key, value).is_none(),
"write cache is unbounded, it can't evict"
);
} else if let Some((_, v)) = self.read_cache.set(key, value) {
debug_assert!(!v.is_pending(), "read cache shouldn't contain writes");
}
}
pub fn get_write_cache_len(&self) -> usize {
self.write_cache.len()
}
pub fn get_write_cache_obj_bytes(&self) -> usize {
self.write_cache
.iter()
.map(|(_, cv)| cv.get_obj().size())
.sum()
}
}
#[derive(Debug, Clone)]
pub struct OnDiskObject<H: WellBehavedHasher> {
pub(crate) data: std::vec::Vec<u8>,
pub ref_count: u64,
pub children: std::vec::Vec<ArenaKey<H>>,
}
impl<H: WellBehavedHasher> Serializable for OnDiskObject<H> {
fn serialize(&self, writer: &mut impl std::io::Write) -> std::io::Result<()> {
self.data.serialize(writer)?;
self.ref_count.serialize(writer)?;
self.children.serialize(writer)?;
Ok(())
}
fn serialized_size(&self) -> usize {
self.data.serialized_size()
+ self.ref_count.serialized_size()
+ self.children.serialized_size()
}
}
impl<H: WellBehavedHasher> Deserializable for OnDiskObject<H> {
fn deserialize(
reader: &mut impl std::io::Read,
recursion_depth: u32,
) -> Result<Self, std::io::Error> {
let data = Deserializable::deserialize(reader, recursion_depth)?;
let ref_count = Deserializable::deserialize(reader, recursion_depth)?;
let children = Deserializable::deserialize(reader, recursion_depth)?;
Ok(OnDiskObject {
data,
ref_count,
children,
})
}
}
impl<H: WellBehavedHasher> OnDiskObject<H> {
fn apply_delta(self, delta: Delta) -> Self {
let ref_count = self
.ref_count
.checked_add_signed(delta.ref_delta as i64)
.expect("ref count can't go out of u64 bounds");
OnDiskObject {
data: self.data,
children: self.children,
ref_count,
}
}
pub fn size(&self) -> usize {
let data_size = self.data.len();
let ref_count_size = 4;
let bytes_per_arena_key = <H as crypto::digest::OutputSizeUser>::output_size();
let children_refs_size = self.children.len() * bytes_per_arena_key;
data_size + ref_count_size + children_refs_size
}
}
impl<H: WellBehavedHasher> PartialEq for OnDiskObject<H> {
fn eq(&self, other: &Self) -> bool {
self.data.eq(&other.data)
&& self.ref_count.eq(&other.ref_count)
&& self.children.eq(&other.children)
}
}
impl<H: WellBehavedHasher> Distribution<OnDiskObject<H>> for Standard {
fn sample<R: rand::prelude::Rng + ?Sized>(&self, rng: &mut R) -> OnDiskObject<H> {
fn rand_vec<T, R: rand::prelude::Rng + ?Sized>(
rng: &mut R,
f: impl Fn(&mut R) -> T,
) -> std::vec::Vec<T> {
const MAX_LEN: usize = 10;
let len = rng.gen_range(0..MAX_LEN);
let mut v = std::vec::Vec::with_capacity(len);
for _ in 0..len {
v.push(f(rng));
}
v
}
OnDiskObject {
data: rand_vec(rng, |r| r.r#gen()),
ref_count: rng.gen_range(0..=i64::MAX as u64),
children: rand_vec(rng, |r| ArenaKey::Ref(r.r#gen())),
}
}
}
#[cfg(test)]
pub(crate) mod raw_node {
use super::*;
use crate::DefaultHasher;
#[derive(Debug, Clone)]
pub(crate) struct RawNode<H: WellBehavedHasher = DefaultHasher> {
pub(crate) key: ArenaHash<H>,
#[allow(dead_code)]
pub(crate) data: std::vec::Vec<u8>,
pub(crate) children: std::vec::Vec<ArenaKey<H>>,
pub(crate) ref_count: u64,
}
impl<H: WellBehavedHasher> RawNode<H> {
pub(crate) fn new(
key: &[u8],
ref_count: u64,
children: std::vec::Vec<&RawNode<H>>,
) -> Self {
let data = key.to_vec();
let key = ArenaHash::_from_bytes(key);
let children = children
.into_iter()
.map(|n| ArenaKey::Ref(n.key.clone()))
.collect();
RawNode {
key,
data,
children,
ref_count,
}
}
pub(crate) fn cache_into_backend<D: DB<Hasher = H>>(
&self,
backend: &mut StorageBackend<D>,
) {
backend.cache(self.key.clone(), self.data.clone(), self.children.clone());
}
pub(crate) fn insert_into_db<D: DB<Hasher = H>>(&self, db: &mut D) {
db.insert_node(self.key.clone(), self.clone().into_obj());
}
pub(crate) fn into_obj(self) -> OnDiskObject<H> {
OnDiskObject {
data: self.data,
ref_count: self.ref_count,
children: self.children,
}
}
}
}
#[cfg(test)]
mod tests {
use crate::{self as storage, storable::child_from};
use crypto::digest::Digest;
use derive_where::derive_where;
use raw_node::RawNode;
use crate::{
Storable,
arena::{IntermediateRepr, IrLoader, Sp, hash},
db::InMemoryDB,
storable::Loader,
};
use super::*;
fn childless_hash<D: DB, T: Storable<D>>(val: &T) -> ArenaHash<D::Hasher> {
let mut hasher = D::Hasher::default();
let mut bytes: std::vec::Vec<u8> = std::vec::Vec::new();
val.to_binary_repr(&mut bytes)
.expect("Storable data should be able to be represented in binary");
hasher.update(bytes);
ArenaHash(hasher.finalize())
}
#[test]
fn cache_overflow_into_db_inmemorydb() {
test_cache_overflow_into_db::<InMemoryDB>();
}
#[cfg(feature = "sqlite")]
#[test]
fn cache_overflow_into_db_sqldb() {
test_cache_overflow_into_db::<crate::db::SqlDB>();
}
#[cfg(feature = "parity-db")]
#[test]
fn cache_overflow_into_db_paritydb() {
test_cache_overflow_into_db::<crate::db::ParityDb>();
}
fn test_cache_overflow_into_db<D: DB + Default>() {
let mut backend = StorageBackend::new(2, D::default());
let (k1, d1) = (childless_hash::<D, u8>(&0), vec![0]);
let (k2, d2) = (childless_hash::<D, u8>(&1), vec![1]);
let (k3, d3) = (childless_hash::<D, u8>(&2), vec![2]);
backend.cache(k1.clone(), d1.clone(), vec![]);
backend.cache(k2.clone(), d2.clone(), vec![]);
backend.cache(k3.clone(), d3.clone(), vec![]);
assert!(backend.write_cache.peek(&k1).is_some());
assert!(backend.write_cache.peek(&k2).is_some());
assert!(backend.write_cache.peek(&k3).is_some());
assert_eq!(backend.write_cache.len(), 3);
backend.flush_cache_evictions_to_db();
assert_eq!(backend.write_cache.len(), 2);
assert_eq!(backend.get(&k1).unwrap().data, d1);
assert!(backend.write_cache.peek(&k1).is_none());
assert!(backend.read_cache.peek(&k1).is_some());
assert_eq!(backend.get(&k2).unwrap().data, d2);
assert!(backend.write_cache.peek(&k2).is_some());
assert_eq!(backend.get(&k3).unwrap().data, d3);
assert!(backend.write_cache.peek(&k3).is_some());
}
fn in_database_repr<D: DB, T: Storable<D>>(
val: T,
) -> (ArenaHash<D::Hasher>, std::vec::Vec<u8>) {
let mut bytes: std::vec::Vec<u8> = std::vec::Vec::new();
Storable::to_binary_repr(&val, &mut bytes)
.expect("Failed to serialize to 'std::vec::Vec<u8>'!");
let key = hash::<D::Hasher>(&bytes, val.children().iter().map(ArenaKey::hash));
(key, bytes)
}
#[test]
fn storage_backend_inmemorydb() {
test_storage_backend::<InMemoryDB>();
}
#[cfg(feature = "sqlite")]
#[test]
fn storage_backend_sqldb() {
test_storage_backend::<crate::db::SqlDB>();
}
#[cfg(feature = "parity-db")]
#[test]
fn storage_backend_paritydb() {
test_storage_backend::<crate::db::ParityDb>();
}
fn test_storage_backend<D: DB>() {
let mut storage_backend = StorageBackend::new(16, D::default());
let (key, bytes) = in_database_repr::<D, u32>(10);
storage_backend.cache(key.clone(), bytes.clone(), vec![]);
assert_eq!(storage_backend.get(&key).unwrap().data, bytes);
assert_eq!(storage_backend.get(&key).unwrap().ref_count, 0);
assert_eq!(storage_backend.database.size(), 0);
assert!(storage_backend.peek_from_memory(&key).unwrap().is_pending());
storage_backend.pre_fetch(&key, None, true);
assert!(storage_backend.write_cache.peek(&key).is_some());
storage_backend.persist(&key);
assert!(storage_backend.peek_from_memory(&key).unwrap().is_pending());
assert_eq!(storage_backend.get(&key).unwrap().ref_count, 0);
assert_eq!(storage_backend.get_root_count(&key), 1);
assert_eq!(storage_backend.database.size(), 0);
storage_backend.flush_all_changes_to_db();
assert_eq!(storage_backend.database.size(), 1);
assert!(storage_backend.write_cache.peek(&key).is_none());
assert!(storage_backend.read_cache.peek(&key).is_some());
assert!(storage_backend.database.get_node(&key).is_some());
storage_backend.uncache(&key);
storage_backend.cache(key.clone(), bytes.clone(), vec![]);
assert_eq!(storage_backend.get(&key).unwrap().data, bytes);
assert_eq!(storage_backend.get(&key).unwrap().ref_count, 0);
assert_eq!(storage_backend.get_root_count(&key), 1);
assert!(!storage_backend.peek_from_memory(&key).unwrap().is_pending());
storage_backend.flush_all_changes_to_db();
storage_backend.write_cache.clear();
assert_eq!(storage_backend.get(&key).unwrap().ref_count, 0);
assert_eq!(storage_backend.get_root_count(&key), 1);
storage_backend.write_cache.clear();
assert!(!storage_backend.peek_from_memory(&key).unwrap().is_pending());
storage_backend.persist(&key);
assert_eq!(storage_backend.database.size(), 1);
assert_eq!(storage_backend.get(&key).unwrap().ref_count, 0);
assert_eq!(
storage_backend.database.get_node(&key).unwrap().ref_count,
0
);
assert_eq!(storage_backend.get_root_count(&key), 2);
storage_backend.unpersist(&key);
assert_eq!(storage_backend.get_root_count(&key), 1);
storage_backend.uncache(&key);
storage_backend.unpersist(&key);
assert_eq!(storage_backend.get(&key).unwrap().ref_count, 0);
assert_eq!(storage_backend.get_root_count(&key), 0);
storage_backend.flush_all_changes_to_db();
storage_backend.gc();
assert_eq!(storage_backend.database.size(), 0);
assert!(storage_backend.get(&key).is_none());
}
#[derive(Debug, Storable)]
#[derive_where(Clone)]
#[storable(db = D)]
struct LabeledNode<D: DB> {
label: u32,
children: std::vec::Vec<Sp<Self, D>>,
}
impl<D: DB> Eq for LabeledNode<D> {}
impl<D: DB> PartialEq for LabeledNode<D> {
fn eq(&self, other: &Self) -> bool {
self.label == other.label && self.children == other.children
}
}
#[test]
fn storage_backend_trees_inmemorydb() {
test_storage_backend_trees::<InMemoryDB>();
}
#[cfg(feature = "sqlite")]
#[test]
fn storage_backend_trees_sqldb() {
test_storage_backend_trees::<crate::db::SqlDB>();
}
#[cfg(feature = "parity-db")]
#[test]
fn storage_backend_trees_paritydb() {
test_storage_backend_trees::<crate::db::ParityDb>();
}
fn test_storage_backend_trees<D: DB>() {
let storage = crate::Storage::<D>::default();
let arena = &storage.arena;
let child = LabeledNode {
label: 0,
children: vec![],
};
let parent = LabeledNode {
label: 1,
children: vec![arena.alloc(child.clone())],
};
let gp = LabeledNode {
label: 2,
children: vec![arena.alloc(parent.clone()), arena.alloc(child.clone())],
};
let (child_key, child_bytes) = in_database_repr(child.clone());
let (parent_key, parent_bytes) = in_database_repr(parent.clone());
let (gp_key, gp_bytes) = in_database_repr(gp.clone());
let child_child_repr = child_from(&child_bytes, &[]);
let parent_child_repr = child_from(&parent_bytes, std::slice::from_ref(&child_child_repr));
let gp_child_repr = child_from(
&gp_bytes,
&[parent_child_repr.clone(), child_child_repr.clone()],
);
let keys_to_childref = HashMap::from([
(child_key.clone(), child_child_repr.clone()),
(parent_key.clone(), parent_child_repr.clone()),
(gp_key.clone(), gp_child_repr.clone()),
]);
let child_reconstructed = <LabeledNode<D> as Storable<D>>::from_binary_repr(
&mut child_bytes.clone().as_slice(),
&mut vec![].into_iter(),
&IrLoader::new(arena, &HashMap::new(), HashMap::new()),
)
.unwrap();
assert_eq!(child_reconstructed, child);
let all: HashMap<ArenaHash<D::Hasher>, IntermediateRepr<D>> = HashMap::from([(
child_key.clone(),
IntermediateRepr::<D>::from_storable(&child),
)]);
let parent_reconstructed = <LabeledNode<D> as Storable<D>>::from_binary_repr(
&mut parent_bytes.clone().as_slice(),
&mut vec![ArenaKey::Ref(child_key.clone())].into_iter(),
&IrLoader::new(arena, &all, keys_to_childref.clone()),
)
.unwrap();
assert_eq!(parent_reconstructed, parent);
let all: HashMap<ArenaHash<D::Hasher>, IntermediateRepr<D>> = HashMap::from([
(child_key.clone(), IntermediateRepr::from_storable(&child)),
(parent_key.clone(), IntermediateRepr::from_storable(&parent)),
]);
let gp_reconstructed = <LabeledNode<D> as Storable<D>>::from_binary_repr(
&mut gp_bytes.clone().as_slice(),
&mut vec![
ArenaKey::Ref(parent_key.clone()),
ArenaKey::Ref(child_key.clone()),
]
.into_iter(),
&IrLoader::new(arena, &all, keys_to_childref.clone()),
)
.unwrap();
assert_eq!(gp_reconstructed, gp);
let mut storage_backend = StorageBackend::<D>::new(16, D::default());
storage_backend.cache(child_key.clone(), child_bytes.clone(), vec![]);
storage_backend.cache(
parent_key.clone(),
parent_bytes.clone(),
vec![ArenaKey::Ref(child_key.clone())],
);
assert_eq!(storage_backend.get(&child_key).unwrap().data, child_bytes);
assert_eq!(storage_backend.get(&parent_key).unwrap().data, parent_bytes);
assert_eq!(storage_backend.get(&child_key).unwrap().ref_count, 1);
assert_eq!(storage_backend.get(&parent_key).unwrap().ref_count, 0);
assert_eq!(storage_backend.database.size(), 0);
assert!(storage_backend.write_cache.peek(&child_key).is_some());
assert!(storage_backend.write_cache.peek(&parent_key).is_some());
storage_backend.persist(&parent_key);
assert_eq!(storage_backend.get(&child_key).unwrap().ref_count, 1);
assert_eq!(storage_backend.get(&parent_key).unwrap().ref_count, 0);
assert_eq!(storage_backend.get_root_count(&child_key), 0);
assert_eq!(storage_backend.get_root_count(&parent_key), 1);
assert_eq!(storage_backend.database.size(), 0);
assert!(storage_backend.write_cache.peek(&parent_key).is_some());
assert!(storage_backend.write_cache.peek(&child_key).is_some());
storage_backend.flush_all_changes_to_db();
assert_eq!(storage_backend.database.size(), 2);
assert!(storage_backend.write_cache.peek(&parent_key).is_none());
assert!(storage_backend.read_cache.peek(&parent_key).is_some());
assert!(storage_backend.write_cache.peek(&child_key).is_none());
assert!(storage_backend.read_cache.peek(&child_key).is_some());
storage_backend.uncache(&child_key);
storage_backend.cache(child_key.clone(), child_bytes, vec![]);
assert_eq!(storage_backend.get(&child_key).unwrap().ref_count, 1);
assert!(
!storage_backend
.peek_from_memory(&child_key)
.unwrap()
.is_pending()
);
storage_backend.uncache(&child_key);
assert_eq!(storage_backend.get(&child_key).unwrap().ref_count, 1);
storage_backend.unpersist(&parent_key);
storage_backend.uncache(&parent_key);
assert_eq!(storage_backend.database.size(), 2);
assert_eq!(storage_backend.get(&parent_key).unwrap().ref_count, 0);
assert_eq!(storage_backend.get(&child_key).unwrap().ref_count, 1);
assert_eq!(storage_backend.get_root_count(&parent_key), 0);
assert_eq!(storage_backend.get_root_count(&child_key), 0);
storage_backend.flush_all_changes_to_db();
assert_eq!(storage_backend.database.size(), 2);
storage_backend.gc();
assert_eq!(storage_backend.database.size(), 0);
}
#[test]
fn ref_counting_inmemorydb() {
test_ref_counting::<InMemoryDB>();
}
#[cfg(feature = "sqlite")]
#[test]
fn ref_counting_sqldb() {
test_ref_counting::<crate::db::SqlDB>();
}
#[cfg(feature = "parity-db")]
#[test]
fn ref_counting_paritydb() {
test_ref_counting::<crate::db::ParityDb>();
}
fn test_ref_counting<D: DB>() {
let n41 = RawNode::new(&[4, 1], 4, vec![]);
let n31 = RawNode::new(&[3, 1], 2, vec![&n41]);
let n32 = RawNode::new(&[3, 2], 3, vec![&n41]);
let n33 = RawNode::new(&[3, 3], 2, vec![&n41]);
let n21 = RawNode::new(&[2, 1], 1, vec![&n31, &n32]);
let n22 = RawNode::new(&[2, 2], 1, vec![&n32, &n33]);
let n11 = RawNode::new(&[1, 1], 0, vec![&n41, &n31, &n32, &n33, &n21, &n22]);
let nodes = [&n41, &n31, &n32, &n33, &n21, &n22, &n11];
let cache_size = 16;
let init_backend = || {
let mut backend = StorageBackend::new(cache_size, D::default());
for n in nodes {
n.cache_into_backend(&mut backend);
}
backend
};
let mut backend = init_backend();
for n in nodes {
assert_eq!(backend.get(&n.key).unwrap().ref_count, n.ref_count);
assert_eq!(backend.get_root_count(&n.key), 0);
}
for n in [&n41, &n31, &n32, &n33, &n21, &n22] {
backend.uncache(&n.key);
}
for n in nodes {
assert_eq!(
backend
.peek_from_memory(&n.key)
.unwrap()
.get_obj()
.ref_count,
n.ref_count
);
assert_eq!(backend.get_root_count(&n.key), 0);
}
backend.uncache(&n11.key);
for n in nodes {
assert!(backend.peek_from_memory(&n.key).is_none());
assert_eq!(backend.get_root_count(&n.key), 0);
}
let mut backend = init_backend();
for n in [&n41, &n31, &n32, &n33, &n21, &n22] {
backend.uncache(&n.key);
}
n31.cache_into_backend(&mut backend);
n33.cache_into_backend(&mut backend);
backend.uncache(&n11.key);
for (n, r) in [(&n41, 2), (&n31, 0), (&n33, 0)] {
assert_eq!(
backend
.peek_from_memory(&n.key)
.unwrap()
.get_obj()
.ref_count,
r
);
assert_eq!(backend.get_root_count(&n.key), 0);
}
for n in [&n32, &n21, &n22, &n11] {
assert!(backend.get(&n.key).is_none());
assert_eq!(backend.get_root_count(&n.key), 0);
}
backend.uncache(&n31.key);
backend.uncache(&n33.key);
for n in nodes {
assert!(backend.get(&n.key).is_none());
assert_eq!(backend.get_root_count(&n.key), 0);
}
let mut backend = init_backend();
for n in nodes {
assert_eq!(backend.get(&n.key).unwrap().ref_count, n.ref_count);
assert_eq!(backend.get_root_count(&n.key), 0);
}
let mut reversed_nodes = nodes;
reversed_nodes.reverse();
for (i, n) in reversed_nodes.iter().enumerate() {
backend.uncache(&n.key);
assert!(backend.peek_from_memory(&n.key).is_none());
for m in &reversed_nodes[i + 1..] {
assert!(backend.peek_from_memory(&m.key).is_some());
}
assert_eq!(backend.get_root_count(&n.key), 0);
}
let mut backend = init_backend();
backend.persist(&n11.key);
for (n, rt) in [
(&n41, 0),
(&n31, 0),
(&n32, 0),
(&n33, 0),
(&n21, 0),
(&n22, 0),
(&n11, 1),
] {
assert_eq!(backend.get(&n.key).unwrap().ref_count, n.ref_count);
assert_eq!(backend.get_root_count(&n.key), rt);
}
for n in nodes {
backend.uncache(&n.key);
}
for (n, rt) in [
(&n41, 0),
(&n31, 0),
(&n32, 0),
(&n33, 0),
(&n21, 0),
(&n22, 0),
(&n11, 1),
] {
assert_eq!(backend.get(&n.key).unwrap().ref_count, n.ref_count);
assert_eq!(backend.get_root_count(&n.key), rt);
}
backend.unpersist(&n11.key);
for n in nodes {
assert!(backend.peek_from_memory(&n.key).is_none());
assert_eq!(backend.get_root_count(&n.key), 0);
}
let mut backend = StorageBackend::new(cache_size, D::default());
for n in nodes {
n.cache_into_backend(&mut backend);
backend.flush_all_changes_to_db();
assert!(backend.write_cache.peek(&n.key).is_none());
assert!(backend.read_cache.peek(&n.key).is_some());
}
for n in nodes {
assert_eq!(backend.get(&n.key).unwrap().ref_count, n.ref_count);
assert_eq!(backend.get_root_count(&n.key), 0);
}
let mut backend = StorageBackend::new(cache_size, D::default());
for n in [&n41, &n31, &n32] {
n.cache_into_backend(&mut backend);
backend.persist(&n.key);
}
backend.flush_all_changes_to_db();
for n in [&n33, &n21, &n22, &n11] {
n.cache_into_backend(&mut backend);
backend.persist(&n.key);
}
for (i, n) in nodes.iter().enumerate() {
for _ in 0..i {
backend.persist(&n.key);
}
}
let root_map = backend.get_roots();
for (i, n) in nodes.iter().enumerate() {
assert_eq!(backend.get(&n.key).unwrap().ref_count, n.ref_count);
assert_eq!(backend.get_root_count(&n.key), (i + 1) as u32);
assert_eq!(root_map.get(&n.key).cloned(), Some((i + 1) as u32));
}
assert_eq!(root_map.len(), nodes.len());
for (i, n) in nodes.iter().enumerate() {
for _ in 0..=i {
backend.unpersist(&n.key);
}
assert_eq!(backend.get_root_count(&n.key), 0);
}
assert_eq!(backend.get_roots(), HashMap::new());
}
#[test]
fn pre_fetch_inmemorydb() {
test_pre_fetch::<InMemoryDB>();
}
#[cfg(feature = "sqlite")]
#[test]
fn pre_fetch_sqldb() {
test_pre_fetch::<crate::db::SqlDB>();
}
#[cfg(feature = "parity-db")]
#[test]
fn pre_fetch_paritydb() {
test_pre_fetch::<crate::db::ParityDb>();
}
fn test_pre_fetch<D: DB>() {
let n41 = RawNode::new(&[4, 1], 3, vec![]);
let n31 = RawNode::new(&[3, 1], 1, vec![&n41]);
let n32 = RawNode::new(&[3, 2], 2, vec![&n41]);
let n33 = RawNode::new(&[3, 3], 1, vec![&n41]);
let n21 = RawNode::new(&[2, 1], 1, vec![&n31, &n32]);
let n22 = RawNode::new(&[2, 2], 1, vec![&n32, &n33]);
let n11 = RawNode::new(&[1, 1], 0, vec![&n21, &n22]);
let test = |cache_size: usize| {
let mut backend = StorageBackend::new(cache_size, D::default());
for n in [&n41, &n31, &n32, &n33, &n21, &n22, &n11] {
n.cache_into_backend(&mut backend);
}
backend.flush_all_changes_to_db();
backend.read_cache.clear();
let max_depth = None;
let truncate = false;
backend.pre_fetch(&n11.key, max_depth, truncate);
backend.get(&n11.key);
let lru_keys: std::vec::Vec<_> =
backend.read_cache.iter().map(|(k, _)| k.clone()).collect();
let mut expected_keys: std::vec::Vec<_> = [&n11, &n21, &n22, &n31, &n32, &n33, &n41]
.map(|n| n.key.clone())
.into_iter()
.collect();
expected_keys.truncate(cache_size);
assert_eq!(lru_keys, expected_keys);
};
test(1);
test(3);
test(7);
}
#[test]
fn gc_inmemorydb() {
test_gc::<InMemoryDB>();
}
#[cfg(feature = "sqlite")]
#[test]
fn gc_sqldb() {
test_gc::<crate::db::SqlDB>();
}
#[cfg(feature = "parity-db")]
#[test]
fn gc_paritydb() {
test_gc::<crate::db::ParityDb>();
}
fn test_gc<D: DB>() {
use crate::backend::raw_node::RawNode;
let n41 = RawNode::new(&[1, 4, 1], 1, vec![]);
let n42 = RawNode::new(&[1, 4, 2], 3, vec![]);
let n43 = RawNode::new(&[1, 4, 3], 2, vec![]);
let n44 = RawNode::new(&[1, 4, 4], 2, vec![]);
let n31 = RawNode::new(&[1, 3, 1], 2, vec![&n41, &n42]);
let n32 = RawNode::new(&[1, 3, 2], 2, vec![&n42, &n43]);
let n33 = RawNode::new(&[1, 3, 3], 1, vec![&n43, &n44]);
let n21 = RawNode::new(&[1, 2, 1], 2, vec![&n31, &n42, &n32]);
let n22 = RawNode::new(&[1, 2, 2], 1, vec![&n32, &n33]);
let n11 = RawNode::new(&[1, 1, 1], 0, vec![&n31, &n21, &n22]);
let o31 = RawNode::new(&[2, 3, 1], 1, vec![]);
let o32 = RawNode::new(&[2, 3, 2], 1, vec![]);
let o21 = RawNode::new(&[2, 2, 1], 1, vec![&o31, &o32]);
let o11 = RawNode::new(&[2, 1, 1], 0, vec![&n21, &n44, &o21]);
let n_nodes = [&n41, &n42, &n43, &n44, &n31, &n32, &n33, &n21, &n22, &n11];
let o_nodes = [&o31, &o32, &o21, &o11];
let mk_backend = || {
let cache_size = 100;
StorageBackend::new(cache_size, D::default())
};
let backend = &mut mk_backend();
for n in n_nodes.iter().chain(o_nodes.iter()) {
n.cache_into_backend(backend);
}
for n in n_nodes.iter().chain(o_nodes.iter()) {
backend.uncache(&n.key);
}
assert_eq!(backend.database.size(), 0);
assert_eq!(backend.read_cache.len(), 0);
assert_eq!(backend.write_cache.len(), 0);
let backend = &mut mk_backend();
for n in n_nodes.iter().chain(o_nodes.iter()) {
n.cache_into_backend(backend);
}
backend.flush_all_changes_to_db();
for n in n_nodes.iter().chain(o_nodes.iter()) {
backend.uncache(&n.key);
}
assert_eq!(backend.database.size(), n_nodes.len() + o_nodes.len());
backend.gc();
assert_eq!(backend.database.size(), 0);
assert_eq!(backend.read_cache.len(), 0);
assert_eq!(backend.write_cache.len(), 0);
let backend = &mut mk_backend();
for n in n_nodes {
n.cache_into_backend(backend);
}
backend.flush_all_changes_to_db();
for n in n_nodes {
backend.uncache(&n.key);
}
for n in o_nodes {
n.cache_into_backend(backend);
}
backend.gc();
let reachable_n_nodes = [&n21, &n31, &n32, &n41, &n42, &n43, &n44];
let unreachable_n_nodes = [&n11, &n22, &n33];
for n in unreachable_n_nodes {
assert!(backend.get(&n.key).is_none());
}
for n in o_nodes.iter().chain(reachable_n_nodes.iter()) {
assert!(backend.get(&n.key).is_some());
}
for n in reachable_n_nodes {
assert!(backend.database.get_node(&n.key).is_some());
}
let backend = &mut mk_backend();
for n in n_nodes {
n.cache_into_backend(backend);
}
backend.persist(&n33.key);
backend.persist(&n44.key);
backend.flush_all_changes_to_db();
for n in n_nodes {
backend.uncache(&n.key);
}
for n in o_nodes {
n.cache_into_backend(backend);
}
backend.persist(&o21.key);
for n in o_nodes {
backend.uncache(&n.key);
}
backend.unpersist(&n33.key); backend.persist(&n31.key); backend.persist(&n44.key);
backend.gc();
let reachable_n_nodes = [&n31, &n41, &n42, &n44];
let reachable_o_nodes = [&o21, &o31, &o32];
let unreachable_n_nodes = [&n11, &n21, &n22, &n32, &n33, &n43];
let unreachable_o_nodes = [&o11];
for n in unreachable_n_nodes.iter().chain(unreachable_o_nodes.iter()) {
assert!(backend.get(&n.key).is_none());
}
for n in reachable_o_nodes.iter().chain(reachable_n_nodes.iter()) {
assert!(backend.get(&n.key).is_some());
}
for n in reachable_n_nodes {
assert!(backend.database.get_node(&n.key).is_some());
}
}
#[test]
fn backend_stats() {
let n1: RawNode = RawNode::new(&[1], 0, vec![]);
let n2 = RawNode::new(&[2], 0, vec![]);
let cache_size = 16;
let db = InMemoryDB::default();
let mut backend = StorageBackend::new(cache_size, db);
let stats = backend.get_stats();
assert_eq!(stats.get_cache_hits, 0);
assert_eq!(stats.get_cache_misses, 0);
n1.cache_into_backend(&mut backend);
let _ = backend.get(&n1.key);
let stats = backend.get_stats();
assert_eq!(stats.get_cache_hits, 1);
assert_eq!(stats.get_cache_misses, 0);
let _ = backend.get(&n2.key);
let stats = backend.get_stats();
assert_eq!(stats.get_cache_hits, 1);
assert_eq!(stats.get_cache_misses, 1);
n2.insert_into_db(&mut backend.database);
let _ = backend.get(&n2.key);
let stats = backend.get_stats();
assert_eq!(stats.get_cache_hits, 1);
assert_eq!(stats.get_cache_misses, 2);
let _ = backend.get(&n2.key);
let stats = backend.get_stats();
assert_eq!(stats.get_cache_hits, 2);
assert_eq!(stats.get_cache_misses, 2);
let _ = backend.get(&n1.key);
let stats = backend.get_stats();
assert_eq!(stats.get_cache_hits, 3);
assert_eq!(stats.get_cache_misses, 2);
}
}