use crate::{Block, BlockStore, StoreStats, TempPin};
use futures::future::BoxFuture;
use futures::prelude::*;
use libipld::Cid;
use parking_lot::Mutex;
use std::{iter::FromIterator, sync::Arc, time::Duration};
use tracing::*;
#[derive(Clone)]
pub struct AsyncBlockStore<R> {
inner: Arc<Mutex<BlockStore>>,
runtime: R,
}
impl AsyncTempPin {
fn new(alias: TempPin) -> Self {
Self(Arc::new(alias))
}
}
#[derive(Debug, Clone)]
pub struct AsyncTempPin(Arc<TempPin>);
pub trait RuntimeAdapter: Clone + 'static {
fn unblock<F, T>(self, f: F) -> BoxFuture<'static, anyhow::Result<T>>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static;
fn sleep(&self, duration: Duration) -> BoxFuture<()>;
}
type AsyncResult<T> = BoxFuture<'static, crate::Result<T>>;
impl<R: RuntimeAdapter> AsyncBlockStore<R> {
pub fn new(runtime: R, store: BlockStore) -> Self {
Self {
runtime,
inner: Arc::new(Mutex::new(store)),
}
}
pub fn wrap(runtime: R, inner: Arc<Mutex<BlockStore>>) -> Self {
Self { runtime, inner }
}
pub fn inner(&self) -> &Arc<Mutex<BlockStore>> {
&self.inner
}
pub fn temp_pin(&self) -> AsyncResult<AsyncTempPin> {
self.unblock(|store| Ok(AsyncTempPin::new(store.temp_pin())))
}
pub fn alias(&self, name: Vec<u8>, link: Option<Cid>) -> AsyncResult<()> {
self.unblock(move |store| store.alias(&name, link.as_ref()))
}
pub fn assign_temp_pin(
&self,
pin: AsyncTempPin,
links: impl IntoIterator<Item = Cid> + Send + 'static,
) -> AsyncResult<()> {
self.unblock(move |store| store.assign_temp_pin(&pin.0, links))
}
pub fn resolve(&self, name: Vec<u8>) -> AsyncResult<Option<Cid>> {
self.unblock(move |store| store.resolve(&name))
}
pub fn alias_many(
&self,
aliases: impl IntoIterator<Item = (impl AsRef<[u8]>, Option<Cid>)> + Send + 'static,
) -> AsyncResult<()> {
self.unblock(move |store| store.alias_many(aliases))
}
pub fn flush(&self) -> AsyncResult<()> {
self.unblock(|store| store.flush())
}
pub fn gc(&self) -> AsyncResult<()> {
self.unblock(|store| store.gc())
}
pub async fn incremental_gc(
&self,
min_blocks: usize,
max_duration: Duration,
) -> crate::Result<bool> {
self.unblock(move |store| store.incremental_gc(min_blocks, max_duration))
.await
}
pub fn incremental_delete_orphaned(
&self,
min_blocks: usize,
max_duration: Duration,
) -> AsyncResult<bool> {
self.unblock(move |store| store.incremental_delete_orphaned(min_blocks, max_duration))
}
pub fn get_block(&self, cid: Cid) -> AsyncResult<Option<Vec<u8>>> {
self.unblock(move |store| store.get_block(&cid))
}
pub fn get_blocks<I: IntoIterator<Item = Cid> + Send + 'static>(
&self,
cids: I,
) -> AsyncResult<impl Iterator<Item = (Cid, Option<Vec<u8>>)>> {
self.unblock(move |store| store.get_blocks(cids))
}
pub fn has_block(&self, cid: &Cid) -> AsyncResult<bool> {
let cid = *cid;
self.unblock(move |store| store.has_block(&cid))
}
pub fn has_blocks<I, O>(&self, cids: I) -> AsyncResult<O>
where
I: IntoIterator<Item = Cid> + Send + 'static,
O: FromIterator<(Cid, bool)> + Send + 'static,
{
self.unblock(move |store| store.has_blocks(cids))
}
pub fn has_cid(&self, cid: Cid) -> AsyncResult<bool> {
self.unblock(move |store| store.has_cid(&cid))
}
pub fn get_missing_blocks<C: FromIterator<Cid> + Send + 'static>(
&self,
cid: Cid,
) -> AsyncResult<C> {
self.unblock(move |store| store.get_missing_blocks(&cid))
}
pub fn get_descendants<C: FromIterator<Cid> + Send + 'static>(
&self,
cid: Cid,
) -> AsyncResult<C> {
self.unblock(move |store| store.get_descendants(&cid))
}
pub fn reverse_alias(&self, cid: Cid) -> AsyncResult<Option<Vec<Vec<u8>>>> {
self.unblock(move |store| store.reverse_alias(&cid))
}
pub fn get_known_cids<C: FromIterator<Cid> + Send + 'static>(&self) -> AsyncResult<C> {
self.unblock(move |store| store.get_known_cids())
}
pub fn get_block_cids<C: FromIterator<Cid> + Send + 'static>(&self) -> AsyncResult<C> {
self.unblock(move |store| store.get_block_cids())
}
pub fn get_store_stats(&self) -> AsyncResult<StoreStats> {
self.unblock(move |store| store.get_store_stats())
}
pub fn put_blocks<B: Block + Send + 'static>(
&self,
blocks: impl IntoIterator<Item = B> + Send + 'static,
alias: Option<AsyncTempPin>,
) -> AsyncResult<()> {
self.unblock(move |store| {
let alias = alias.as_ref().map(|x| x.0.as_ref());
store.put_blocks(blocks, alias)
})
}
pub fn put_block(
&self,
block: impl Block + Send + 'static,
alias: Option<&AsyncTempPin>,
) -> AsyncResult<()> {
let alias = alias.cloned();
self.unblock(move |store| {
let alias = alias.as_ref().map(|x| x.0.as_ref());
store.put_block(&block, alias)
})
}
pub async fn gc_loop(self, config: GcConfig) -> crate::Result<()> {
self.runtime.sleep(config.interval / 2).await;
while self.ref_count() > 1 {
debug!("gc_loop running incremental gc");
self.incremental_gc(config.min_blocks, config.target_duration)
.await?;
self.runtime.sleep(config.interval / 2).await;
debug!("gc_loop running incremental delete orphaned");
self.incremental_delete_orphaned(config.min_blocks, config.target_duration)
.await?;
self.runtime.sleep(config.interval / 2).await;
}
Ok(())
}
fn unblock<T: Send + 'static>(
&self,
f: impl FnOnce(&mut BlockStore) -> crate::Result<T> + Send + 'static,
) -> AsyncResult<T> {
let runtime = self.runtime.clone();
let inner = self.inner.clone();
runtime
.unblock(move || f(&mut inner.lock()))
.err_into()
.map(|x| x.and_then(|x| x))
.boxed()
}
}
impl<R> AsyncBlockStore<R> {
pub fn ref_count(&self) -> usize {
Arc::strong_count(&self.inner)
}
}
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct GcConfig {
pub interval: Duration,
pub min_blocks: usize,
pub target_duration: Duration,
}
impl GcConfig {
pub fn with_interval(self, interval: Duration) -> Self {
Self { interval, ..self }
}
}
impl Default for GcConfig {
fn default() -> Self {
Self {
interval: Duration::from_secs(60),
min_blocks: 10000,
target_duration: Duration::from_secs(1),
}
}
}