pub use ipfs_sqlite_block_store::TempPin;
use ipfs_sqlite_block_store::{
cache::{CacheTracker, InMemCacheTracker, SqliteCacheTracker},
BlockStore, Config, Synchronous,
};
use lazy_static::lazy_static;
use libipld::{codec::References, store::StoreParams, Block, Cid, Ipld, Result};
use parking_lot::Mutex;
use prometheus::{
core::{Collector, Desc},
proto::MetricFamily,
HistogramOpts, HistogramVec, IntCounterVec, IntGauge, Opts, Registry,
};
use std::{future::Future, path::PathBuf, sync::Arc, time::Duration};
use tracing::info;
use crate::executor::{Executor, JoinHandle};
use std::collections::HashSet;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct StorageConfig {
pub path: Option<PathBuf>,
pub access_db_path: Option<PathBuf>,
pub cache_size_blocks: u64,
pub cache_size_bytes: u64,
pub gc_interval: Duration,
pub gc_min_blocks: usize,
pub gc_target_duration: Duration,
}
impl StorageConfig {
pub fn new(
path: Option<PathBuf>,
access_db_path: Option<PathBuf>,
cache_size: u64,
gc_interval: Duration,
) -> Self {
Self {
path,
access_db_path,
cache_size_blocks: cache_size,
cache_size_bytes: u64::MAX,
gc_interval,
gc_min_blocks: usize::MAX,
gc_target_duration: Duration::new(u64::MAX, 1_000_000_000 - 1),
}
}
}
struct StorageServiceInner<S: StoreParams> {
executor: Executor,
store: Arc<Mutex<BlockStore<S>>>,
gc_target_duration: Duration,
gc_min_blocks: usize,
gc_task: Option<JoinHandle<()>>,
}
impl<S: StoreParams> Drop for StorageServiceInner<S> {
fn drop(&mut self) {
if let Some(t) = self.gc_task.take() {
t.abort()
}
}
}
#[derive(Clone)]
pub struct StorageService<S: StoreParams> {
inner: Arc<StorageServiceInner<S>>,
}
impl<S: StoreParams> StorageService<S>
where
Ipld: References<S::Codecs>,
{
pub fn open(config: StorageConfig, executor: Executor) -> Result<Self> {
let inner = StorageServiceInner::open(config, executor)?;
Ok(Self {
inner: Arc::new(inner),
})
}
}
impl<S: StoreParams> StorageServiceInner<S>
where
Ipld: References<S::Codecs>,
{
pub fn open(config: StorageConfig, executor: Executor) -> Result<Self> {
let store_config = Config::default()
.with_size_targets(config.cache_size_blocks, config.cache_size_bytes)
.with_pragma_synchronous(Synchronous::Normal);
let tracker: Arc<dyn CacheTracker> = if let Some(path) = config.access_db_path {
let path = if path.is_file() {
path
} else {
std::fs::create_dir_all(&path)?;
path.join("access")
};
Arc::new(SqliteCacheTracker::open(&path, |access, _| Some(access))?)
} else {
Arc::new(InMemCacheTracker::new(|access, _| Some(access)))
};
let is_memory = config.path.is_none();
let store = if let Some(path) = config.path {
let path = if path.is_file() {
path
} else {
std::fs::create_dir_all(&path)?;
path.join("db")
};
BlockStore::open(path, store_config.with_cache_tracker(tracker))?
} else {
BlockStore::memory(store_config.with_cache_tracker(tracker))?
};
let store = Arc::new(Mutex::new(store));
let gc_interval = config.gc_interval;
let gc_min_blocks = config.gc_min_blocks;
let gc_target_duration = config.gc_target_duration;
let gc_task = if is_memory {
let gc = store.clone();
executor.spawn(async move {
loop {
futures_timer::Delay::new(gc_interval).await;
info!("going for gc!");
gc.lock()
.incremental_gc(gc_min_blocks, gc_target_duration)
.map_err(|e| {
tracing::warn!("failure during incremental gc: {:#}", e);
e
})
.ok();
}
})
} else {
let mut gc = store.lock().additional_connection()?;
executor.spawn(async move {
loop {
futures_timer::Delay::new(gc_interval).await;
info!("going for gc!");
gc.incremental_gc(gc_min_blocks, gc_target_duration)
.map_err(|e| {
tracing::warn!("failure during incremental gc: {:#}", e);
e
})
.ok();
}
})
};
Ok(Self {
executor,
gc_target_duration: config.gc_target_duration,
gc_min_blocks: config.gc_min_blocks,
store,
gc_task: Some(gc_task),
})
}
}
impl<S: StoreParams> StorageService<S>
where
Ipld: References<S::Codecs>,
{
pub fn rw<F: FnOnce(&mut Batch<'_, S>) -> Result<R>, R>(
&self,
op: &'static str,
f: F,
) -> Result<R> {
QUERIES_TOTAL.with_label_values(&[op]).inc();
let timer = QUERY_DURATION
.with_label_values(&["lock_wait"])
.start_timer();
let mut lock = self.inner.store.lock();
let t = timer.stop_and_record();
if t > 1.0 {
tracing::warn!(op, "very long storage lock wait time of {:.1}s", t);
}
let _timer = QUERY_DURATION.with_label_values(&[op]).start_timer();
let mut txn = Batch(lock.transaction());
let res = f(&mut txn);
if res.is_ok() {
txn.0.commit()?;
}
res
}
pub fn create_temp_pin(&self) -> Result<TempPin> {
self.rw("create_temp_pin", |x| x.create_temp_pin())
}
pub fn temp_pin(
&self,
temp: &mut TempPin,
iter: impl IntoIterator<Item = Cid> + Send + 'static,
) -> Result<()> {
self.rw("temp_pin", |x| x.temp_pin(temp, iter))
}
pub fn iter(&self) -> Result<impl Iterator<Item = Cid>> {
self.rw("iter", |x| x.iter())
}
pub fn contains(&self, cid: &Cid) -> Result<bool> {
self.rw("contains", |x| x.contains(cid))
}
pub fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
self.rw("get", |x| x.get(cid))
}
pub fn insert(&self, block: Block<S>) -> Result<()> {
self.rw("insert", |x| x.insert(block))
}
pub fn alias(&self, alias: &[u8], cid: Option<&Cid>) -> Result<()> {
self.rw("alias", |x| x.alias(alias, cid))
}
pub fn aliases(&self) -> Result<Vec<(Vec<u8>, Cid)>> {
self.rw("aliases", |x| x.aliases())
}
pub fn resolve(&self, alias: &[u8]) -> Result<Option<Cid>> {
self.rw("resolve", |x| x.resolve(alias))
}
pub fn reverse_alias(&self, cid: &Cid) -> Result<Option<HashSet<Vec<u8>>>> {
self.rw("reverse_alias", |x| x.reverse_alias(cid))
}
pub fn missing_blocks(&self, cid: &Cid) -> Result<Vec<Cid>> {
self.rw("missing_blocks", |x| x.missing_blocks(cid))
}
pub fn evict(&self) -> impl Future<Output = Result<()>> {
let store = self.inner.store.clone();
let gc_min_blocks = self.inner.gc_min_blocks;
let gc_target_duration = self.inner.gc_target_duration;
let evict = self.inner.executor.spawn_blocking(move || {
while !store
.lock()
.incremental_gc(gc_min_blocks, gc_target_duration)?
{
tracing::trace!("x");
}
Ok(())
});
async { evict.await? }
}
pub fn flush(&self) -> impl Future<Output = Result<()>> {
let store = self.inner.store.clone();
let flush = self
.inner
.executor
.spawn_blocking(move || store.lock().flush());
async { Ok(observe_future("flush", flush).await??) }
}
pub fn register_metrics(&self, registry: &Registry) -> Result<()> {
registry.register(Box::new(QUERIES_TOTAL.clone()))?;
registry.register(Box::new(QUERY_DURATION.clone()))?;
registry.register(Box::new(SqliteStoreCollector::new(
self.inner.store.clone(),
)))?;
Ok(())
}
}
lazy_static! {
pub static ref QUERIES_TOTAL: IntCounterVec = IntCounterVec::new(
Opts::new(
"block_store_queries_total",
"Number of block store requests labelled by type."
),
&["type"],
)
.unwrap();
pub static ref QUERY_DURATION: HistogramVec = HistogramVec::new(
HistogramOpts::new(
"block_store_query_duration",
"Duration of store queries labelled by type.",
)
.buckets(vec![
0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0,
5.0, 10.0
]),
&["type"],
)
.unwrap();
}
async fn observe_future<T, F>(name: &'static str, query: F) -> Result<T>
where
F: Future<Output = anyhow::Result<T>>,
{
QUERIES_TOTAL.with_label_values(&[name]).inc();
let timer = QUERY_DURATION.with_label_values(&[name]).start_timer();
let res = query.await;
if res.is_ok() {
timer.observe_duration();
} else {
timer.stop_and_discard();
}
res
}
struct SqliteStoreCollector<S: StoreParams> {
store: Arc<Mutex<BlockStore<S>>>,
desc: Desc,
}
impl<S: StoreParams> Collector for SqliteStoreCollector<S>
where
Ipld: References<S::Codecs>,
{
fn desc(&self) -> Vec<&Desc> {
vec![&self.desc]
}
fn collect(&self) -> Vec<MetricFamily> {
let mut family = vec![];
if let Ok(stats) = self.store.lock().get_store_stats() {
let store_block_count =
IntGauge::new("block_store_block_count", "Number of stored blocks").unwrap();
store_block_count.set(stats.count() as _);
family.push(store_block_count.collect()[0].clone());
let store_size =
IntGauge::new("block_store_size", "Size in bytes of stored blocks").unwrap();
store_size.set(stats.size() as _);
family.push(store_size.collect()[0].clone());
}
family
}
}
impl<S: StoreParams> SqliteStoreCollector<S> {
pub fn new(store: Arc<Mutex<BlockStore<S>>>) -> Self {
let desc = Desc::new(
"block_store_stats".into(),
".".into(),
Default::default(),
Default::default(),
)
.unwrap();
Self { store, desc }
}
}
pub struct Batch<'a, S>(ipfs_sqlite_block_store::Transaction<'a, S>);
impl<'a, S: StoreParams> Batch<'a, S>
where
S: StoreParams,
Ipld: References<S::Codecs>,
{
pub fn create_temp_pin(&mut self) -> Result<TempPin> {
Ok(self.0.temp_pin())
}
pub fn temp_pin(
&mut self,
temp: &mut TempPin,
iter: impl IntoIterator<Item = Cid> + Send + 'static,
) -> Result<()> {
for link in iter {
self.0.extend_temp_pin(temp, &link)?;
}
Ok(())
}
pub fn iter(&mut self) -> Result<impl Iterator<Item = Cid>> {
let cids = self.0.get_block_cids::<Vec<Cid>>()?;
Ok(cids.into_iter())
}
pub fn contains(&mut self, cid: &Cid) -> Result<bool> {
Ok(self.0.has_block(cid)?)
}
pub fn get(&mut self, cid: &Cid) -> Result<Option<Vec<u8>>> {
Ok(self.0.get_block(cid)?)
}
pub fn insert(&mut self, block: Block<S>) -> Result<()> {
Ok(self.0.put_block(block, None)?)
}
pub fn resolve(&mut self, alias: &[u8]) -> Result<Option<Cid>> {
Ok(self.0.resolve(alias)?)
}
pub fn alias(&mut self, alias: &[u8], cid: Option<&Cid>) -> Result<()> {
Ok(self.0.alias(alias, cid)?)
}
pub fn aliases(&mut self) -> Result<Vec<(Vec<u8>, Cid)>> {
Ok(self.0.aliases()?)
}
pub fn reverse_alias(&mut self, cid: &Cid) -> Result<Option<HashSet<Vec<u8>>>> {
Ok(self.0.reverse_alias(cid)?)
}
pub fn missing_blocks(&mut self, cid: &Cid) -> Result<Vec<Cid>> {
Ok(self.0.get_missing_blocks(cid)?)
}
}
#[cfg(test)]
mod tests {
use crate::executor::Executor;
use super::*;
use libipld::{alias, cbor::DagCborCodec, ipld, multihash::Code, store::DefaultParams};
fn create_block(ipld: &Ipld) -> Block<DefaultParams> {
Block::encode(DagCborCodec, Code::Blake3_256, ipld).unwrap()
}
macro_rules! assert_evicted {
($store:expr, $block:expr) => {
assert_eq!($store.reverse_alias($block.cid()).unwrap(), None);
};
}
macro_rules! assert_pinned {
($store:expr, $block:expr) => {
assert_eq!(
$store
.reverse_alias($block.cid())
.unwrap()
.map(|a| !a.is_empty()),
Some(true)
);
};
}
macro_rules! assert_unpinned {
($store:expr, $block:expr) => {
assert_eq!(
$store
.reverse_alias($block.cid())
.unwrap()
.map(|a| !a.is_empty()),
Some(false)
);
};
}
fn tracing_try_init() {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init()
.ok();
}
fn create_store() -> StorageService<DefaultParams> {
let config = StorageConfig::new(None, None, 2, Duration::from_secs(100));
StorageService::open(config, Executor::new()).unwrap()
}
#[async_std::test]
async fn test_store_evict() {
tracing_try_init();
let store = create_store();
let a = create_block(&ipld!(0));
let b = create_block(&ipld!(1));
let c = create_block(&ipld!(2));
let d = create_block(&ipld!(3));
store.insert(a.clone()).unwrap();
store.insert(b.clone()).unwrap();
store.flush().await.unwrap();
store.evict().await.unwrap();
assert_unpinned!(&store, &a);
assert_unpinned!(&store, &b);
store.insert(c.clone()).unwrap();
store.flush().await.unwrap();
store.evict().await.unwrap();
assert_evicted!(&store, &a);
assert_unpinned!(&store, &b);
assert_unpinned!(&store, &c);
store.get(b.cid()).unwrap();
store.insert(d.clone()).unwrap();
store.flush().await.unwrap();
store.evict().await.unwrap();
assert_unpinned!(&store, &b);
assert_evicted!(&store, &c);
assert_unpinned!(&store, &d);
}
#[async_std::test]
#[allow(clippy::many_single_char_names)]
async fn test_store_unpin() {
tracing_try_init();
let store = create_store();
let a = create_block(&ipld!({ "a": [] }));
let b = create_block(&ipld!({ "b": [a.cid()] }));
let c = create_block(&ipld!({ "c": [a.cid()] }));
let x = alias!(x).as_bytes().to_vec();
let y = alias!(y).as_bytes().to_vec();
store.insert(a.clone()).unwrap();
store.insert(b.clone()).unwrap();
store.insert(c.clone()).unwrap();
store.alias(&x, Some(b.cid())).unwrap();
store.alias(&y, Some(c.cid())).unwrap();
store.flush().await.unwrap();
assert_pinned!(&store, &a);
assert_pinned!(&store, &b);
assert_pinned!(&store, &c);
store.alias(&x, None).unwrap();
store.flush().await.unwrap();
assert_pinned!(&store, &a);
assert_unpinned!(&store, &b);
assert_pinned!(&store, &c);
store.alias(&y, None).unwrap();
store.flush().await.unwrap();
assert_unpinned!(&store, &a);
assert_unpinned!(&store, &b);
assert_unpinned!(&store, &c);
}
#[async_std::test]
#[allow(clippy::many_single_char_names)]
async fn test_store_unpin2() {
tracing_try_init();
let store = create_store();
let a = create_block(&ipld!({ "a": [] }));
let b = create_block(&ipld!({ "b": [a.cid()] }));
let x = alias!(x).as_bytes().to_vec();
let y = alias!(y).as_bytes().to_vec();
store.insert(a.clone()).unwrap();
store.insert(b.clone()).unwrap();
store.alias(&x, Some(b.cid())).unwrap();
store.alias(&y, Some(b.cid())).unwrap();
store.flush().await.unwrap();
assert_pinned!(&store, &a);
assert_pinned!(&store, &b);
store.alias(&x, None).unwrap();
store.flush().await.unwrap();
assert_pinned!(&store, &a);
assert_pinned!(&store, &b);
store.alias(&y, None).unwrap();
store.flush().await.unwrap();
assert_unpinned!(&store, &a);
assert_unpinned!(&store, &b);
}
}