use anyhow::anyhow;
use anyhow::Result;
use core::hash::Hash;
use fnv::FnvHashMap;
use libipld::cbor::DagCbor;
use parking_lot::Mutex;
use std::{
num::NonZeroUsize,
sync::{Arc, RwLock},
};
use weight_cache::{Weighable, WeightCache};
mod thread_local_zstd;
mod zstd_dag_cbor_seq;
pub(crate) use thread_local_zstd::decompress_and_transform;
pub use zstd_dag_cbor_seq::ZstdDagCborSeq;
use crate::{
index::{Branch, CompactSeq, Index},
TreeTypes,
};
pub trait BanyanValue: DagCbor + Send + 'static {}
impl<T: DagCbor + Send + Sync + 'static> BanyanValue for T {}
pub trait BlockWriter<L>: Send + Sync + 'static {
fn put(&mut self, data: Vec<u8>) -> Result<L>;
}
pub trait ReadOnlyStore<L>: Clone + Send + Sync + 'static {
fn get(&self, link: &L) -> Result<Box<[u8]>>;
}
#[derive(Clone)]
pub struct MemReader<R, L> {
inner: R,
store: MemStore<L>,
}
impl<R, L: Copy + Eq + Hash + Send + Sync + 'static> MemReader<R, L>
where
R: ReadOnlyStore<L>,
{
pub fn new(
reader: R,
max_size: usize,
digest: impl Fn(&[u8]) -> L + Send + Sync + 'static,
) -> (Self, MemStore<L>) {
let store = MemStore::new(max_size, digest);
(
MemReader {
inner: reader,
store: store.clone(),
},
store,
)
}
}
impl<R: ReadOnlyStore<L> + Send + Sync + 'static, L: Copy + Eq + Hash + Send + Sync + 'static>
ReadOnlyStore<L> for MemReader<R, L>
{
fn get(&self, link: &L) -> anyhow::Result<Box<[u8]>> {
match self.store.get0(link) {
Some(block) => Ok(block),
None => self.inner.get(link),
}
}
}
#[derive(Clone)]
pub struct MemStore<L>(Arc<Inner<L>>);
struct Inner<L> {
blocks: RwLock<Blocks<L>>,
digest: Arc<dyn Fn(&[u8]) -> L + Send + Sync>,
max_size: usize,
}
#[derive(Debug)]
struct Blocks<L> {
map: FnvHashMap<L, Box<[u8]>>,
current_size: usize,
}
impl<L: Eq + Hash + Copy> MemStore<L> {
pub fn new(max_size: usize, digest: impl Fn(&[u8]) -> L + Send + Sync + 'static) -> Self {
Self(Arc::new(Inner {
digest: Arc::new(digest),
blocks: RwLock::new(Blocks {
map: FnvHashMap::default(),
current_size: 0,
}),
max_size,
}))
}
pub fn into_inner(self) -> anyhow::Result<FnvHashMap<L, Box<[u8]>>> {
let inner = Arc::try_unwrap(self.0).map_err(|_| anyhow!("busy"))?;
let blocks = inner.blocks.into_inner().map_err(|_| anyhow!("poisoned"))?;
Ok(blocks.map)
}
fn get0(&self, link: &L) -> Option<Box<[u8]>> {
let blocks = self.0.as_ref().blocks.read().unwrap();
blocks.map.get(link).cloned()
}
fn put0(&self, data: Vec<u8>) -> anyhow::Result<L> {
let digest = (self.0.digest)(&data);
let len = data.len();
let mut blocks = self.0.blocks.write().unwrap();
if blocks.current_size + data.len() > self.0.max_size {
anyhow::bail!("full");
}
let new = blocks.map.insert(digest, data.into()).is_none();
if new {
blocks.current_size += len;
}
std::mem::drop(blocks);
Ok(digest)
}
}
impl<L: Eq + Hash + Copy + Send + Sync + 'static> ReadOnlyStore<L> for MemStore<L> {
fn get(&self, link: &L) -> anyhow::Result<Box<[u8]>> {
if let Some(value) = self.get0(link) {
Ok(value)
} else {
Err(anyhow!("not there"))
}
}
}
impl<L: Eq + Hash + Send + Sync + Copy + 'static> BlockWriter<L> for MemStore<L> {
fn put(&mut self, data: Vec<u8>) -> anyhow::Result<L> {
self.put0(data)
}
}
impl<T: TreeTypes> Weighable for Branch<T> {
fn measure(value: &Self) -> usize {
let mut bytes = std::mem::size_of::<Branch<T>>();
for child in value.children.iter() {
bytes += std::mem::size_of::<Index<T>>();
match child {
Index::Leaf(leaf) => {
bytes += leaf.keys.estimated_size();
}
Index::Branch(branch) => {
bytes += branch.summaries.estimated_size();
}
}
}
bytes
}
}
type CacheOrBypass<T> = Option<Arc<Mutex<WeightCache<<T as TreeTypes>::Link, Branch<T>>>>>;
#[derive(Debug, Clone)]
pub struct BranchCache<T: TreeTypes>(CacheOrBypass<T>);
impl<T: TreeTypes> Default for BranchCache<T> {
fn default() -> Self {
Self::new(64 << 20)
}
}
impl<T: TreeTypes> BranchCache<T> {
pub fn new(capacity: usize) -> Self {
let cache = if capacity == 0 {
None
} else {
Some(Arc::new(Mutex::new(WeightCache::new(
NonZeroUsize::new(capacity).expect("Cache capacity must be "),
))))
};
Self(cache)
}
pub fn get<'a>(&'a self, link: &'a T::Link) -> Option<Branch<T>> {
self.0.as_ref().and_then(|x| x.lock().get(link).cloned())
}
pub fn put(&self, link: T::Link, branch: Branch<T>) {
if let Some(Err(e)) = self.0.as_ref().map(|x| x.lock().put(link, branch)) {
tracing::warn!("Adding {} to cache failed: {}", link, e);
}
}
pub fn reset(&self, capacity: NonZeroUsize) {
if let Some(cache) = self.0.as_ref() {
let mut cache = cache.lock();
*cache = WeightCache::new(capacity);
}
}
}