pub mod async_block_store;
pub mod cache;
mod cidbytes;
mod db;
mod error;
#[cfg(test)]
mod tests;
use crate::cidbytes::CidBytes;
use cache::{BlockInfo, CacheTracker, NoopCacheTracker, WriteInfo};
use db::*;
pub use error::{BlockStoreError, Result};
use fnv::FnvHashSet;
use libipld::{
cid::{self, Cid},
codec::Codec,
store::StoreParams,
Ipld, IpldCodec,
};
use parking_lot::Mutex;
use rusqlite::{Connection, DatabaseName};
use std::{
convert::TryFrom,
fmt,
iter::FromIterator,
ops::DerefMut,
path::Path,
sync::{
atomic::{AtomicI64, Ordering},
Arc,
},
time::Duration,
};
use tracing::*;
#[derive(Debug, Clone, Copy, Default)]
pub struct SizeTargets {
pub count: u64,
pub size: u64,
}
impl SizeTargets {
pub fn new(count: u64, size: u64) -> Self {
Self { count, size }
}
pub fn exceeded(&self, stats: &StoreStats) -> bool {
stats.count > self.count || stats.size > self.size
}
pub fn max_value() -> Self {
Self {
count: u64::max_value(),
size: u64::max_value(),
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum Synchronous {
Full,
Normal,
Off,
}
impl fmt::Display for Synchronous {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
Synchronous::Full => "FULL",
Synchronous::Normal => "NORMAL",
Synchronous::Off => "OFF",
})
}
}
#[derive(Debug)]
pub struct Config {
size_targets: SizeTargets,
cache_tracker: Box<dyn CacheTracker>,
pragma_synchronous: Synchronous,
pragma_cache_pages: u64,
}
impl Default for Config {
fn default() -> Self {
Self {
size_targets: Default::default(),
cache_tracker: Box::new(NoopCacheTracker),
pragma_synchronous: Synchronous::Full,
pragma_cache_pages: 8192,
}
}
}
impl Config {
pub fn with_size_targets(mut self, size_targets: SizeTargets) -> Self {
self.size_targets = size_targets;
self
}
pub fn with_cache_tracker<T: CacheTracker + 'static>(mut self, cache_tracker: T) -> Self {
self.cache_tracker = Box::new(cache_tracker);
self
}
pub fn with_pragma_synchronous(mut self, value: Synchronous) -> Self {
self.pragma_synchronous = value;
self
}
pub fn with_pragma_cache_pages(mut self, value: u64) -> Self {
self.pragma_cache_pages = value;
self
}
}
pub struct BlockStore {
conn: Connection,
expired_temp_pins: Arc<Mutex<Vec<i64>>>,
config: Config,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct StoreStats {
count: u64,
size: u64,
}
impl StoreStats {
pub fn count(&self) -> u64 {
self.count
}
pub fn size(&self) -> u64 {
self.size
}
}
pub struct TempPin {
id: AtomicI64,
expired_temp_pins: Arc<Mutex<Vec<i64>>>,
}
impl fmt::Debug for TempPin {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let id = self.id.load(Ordering::SeqCst);
let mut builder = f.debug_struct("TempAlias");
if id > 0 {
builder.field("id", &id);
}
builder.finish()
}
}
impl Drop for TempPin {
fn drop(&mut self) {
let id = self.id.get_mut();
let alias = *id;
if alias > 0 {
*id = 0;
self.expired_temp_pins.lock().push(alias);
}
}
}
pub trait Block {
fn cid(&self) -> &Cid;
fn data(&self) -> &[u8];
}
impl<S: StoreParams> Block for libipld::Block<S> {
fn cid(&self) -> &Cid {
libipld::Block::cid(&self)
}
fn data(&self) -> &[u8] {
libipld::Block::data(&self)
}
}
#[derive(Debug, Clone)]
pub struct OwnedBlock {
cid: Cid,
data: Box<[u8]>,
}
impl OwnedBlock {
pub fn new(cid: Cid, data: impl Into<Box<[u8]>>) -> Self {
Self {
cid,
data: data.into(),
}
}
}
impl Block for OwnedBlock {
fn cid(&self) -> &Cid {
&self.cid
}
fn data(&self) -> &[u8] {
&self.data
}
}
struct BorrowedBlock<'a> {
cid: Cid,
data: &'a [u8],
}
impl<'a> BorrowedBlock<'a> {
fn new(cid: Cid, data: &'a [u8]) -> Self {
Self { cid, data }
}
}
impl<'a> Block for BorrowedBlock<'a> {
fn cid(&self) -> &Cid {
&self.cid
}
fn data(&self) -> &[u8] {
self.data
}
}
fn links(block: &impl Block) -> anyhow::Result<Vec<Cid>> {
let mut links = Vec::new();
IpldCodec::try_from(block.cid().codec())?
.references::<Ipld, Vec<_>>(&block.data(), &mut links)?;
Ok(links)
}
impl BlockStore {
pub fn memory(config: Config) -> crate::Result<Self> {
let mut conn = Connection::open_in_memory()?;
init_db(
&mut conn,
true,
config.pragma_cache_pages as i64,
config.pragma_synchronous,
)?;
Ok(Self {
conn,
expired_temp_pins: Arc::new(Mutex::new(Vec::new())),
config,
})
}
pub fn open(path: impl AsRef<Path>, config: Config) -> crate::Result<Self> {
let mut conn = Connection::open(path)?;
init_db(
&mut conn,
false,
config.pragma_cache_pages as i64,
config.pragma_synchronous,
)?;
let ids = in_txn(&mut conn, |txn| get_ids(txn))?;
config.cache_tracker.retain_ids(&ids);
Ok(Self {
conn,
expired_temp_pins: Arc::new(Mutex::new(Vec::new())),
config,
})
}
pub fn open_test(path: impl AsRef<Path>, config: Config) -> crate::Result<Self> {
let mut conn = Connection::open_in_memory()?;
debug!(
"Restoring in memory database from {}",
path.as_ref().display()
);
conn.restore(
DatabaseName::Main,
path,
Some(|p: rusqlite::backup::Progress| {
let percent = if p.pagecount == 0 {
100
} else {
(p.pagecount - p.remaining) * 100 / p.pagecount
};
if percent % 10 == 0 {
debug!("Restoring: {} %", percent);
}
}),
)?;
let ids = in_txn(&mut conn, |txn| get_ids(txn))?;
config.cache_tracker.retain_ids(&ids);
Ok(Self {
conn,
expired_temp_pins: Arc::new(Mutex::new(Vec::new())),
config,
})
}
pub fn flush(&self) -> crate::Result<()> {
Ok(self.conn.pragma_update(None, "wal_checkpoint", &"FULL")?)
}
pub fn integrity_check(&self) -> crate::Result<()> {
let result = integrity_check(&self.conn)?;
if result == vec!["ok".to_owned()] {
Ok(())
} else {
let error_text = result.join(";");
Err(crate::error::BlockStoreError::SqliteError(
rusqlite::Error::SqliteFailure(rusqlite::ffi::Error::new(11), Some(error_text)),
))
}
}
pub fn temp_pin(&self) -> TempPin {
TempPin {
id: AtomicI64::new(0),
expired_temp_pins: self.expired_temp_pins.clone(),
}
}
pub fn alias(&mut self, name: impl AsRef<[u8]>, link: Option<&Cid>) -> crate::Result<()> {
self.alias_many(std::iter::once((name, link.cloned())))
}
pub fn resolve(&self, name: impl AsRef<[u8]>) -> crate::Result<Option<Cid>> {
in_ro_txn(&self.conn, |txn| {
Ok(resolve::<CidBytes>(txn, name.as_ref())?
.map(|c| Cid::try_from(&c))
.transpose()?)
})
}
pub fn alias_many(
&mut self,
aliases: impl IntoIterator<Item = (impl AsRef<[u8]>, Option<Cid>)>,
) -> crate::Result<()> {
in_txn(&mut self.conn, |txn| {
for (name, link) in aliases.into_iter() {
let link: Option<CidBytes> = link.map(|x| CidBytes::try_from(&x)).transpose()?;
alias(txn, name.as_ref(), link.as_ref())?;
}
Ok(())
})
}
pub fn assign_temp_pin(
&mut self,
pin: &TempPin,
links: impl IntoIterator<Item = Cid>,
) -> crate::Result<()> {
let pin0 = pin.id.load(Ordering::SeqCst);
let pin0 = in_txn(&mut self.conn, |txn| {
let links = links
.into_iter()
.map(|x| CidBytes::try_from(&x))
.collect::<std::result::Result<Vec<_>, cid::Error>>()?;
assign_temp_pin(txn, pin0, links)
})?;
pin.id.store(pin0, Ordering::SeqCst);
Ok(())
}
pub fn reverse_alias(&self, cid: &Cid) -> crate::Result<Option<Vec<Vec<u8>>>> {
let cid = CidBytes::try_from(cid)?;
in_ro_txn(&self.conn, |txn| reverse_alias(txn, cid.as_ref()))
}
pub fn has_cid(&self, cid: &Cid) -> Result<bool> {
let cid = CidBytes::try_from(cid)?;
in_ro_txn(&self.conn, |txn| has_cid(txn, cid))
}
pub fn has_block(&mut self, cid: &Cid) -> Result<bool> {
let cid = CidBytes::try_from(cid)?;
in_ro_txn(&self.conn, |txn| has_block(txn, cid))
}
pub fn has_blocks<I, O>(&self, cids: I) -> Result<O>
where
I: IntoIterator<Item = Cid>,
O: FromIterator<(Cid, bool)>,
{
in_ro_txn(&self.conn, |txn| {
cids.into_iter()
.map(|cid| -> Result<(Cid, bool)> {
Ok((cid, has_block(txn, CidBytes::try_from(&cid)?)?))
})
.collect::<crate::Result<O>>()
})
}
pub fn get_store_stats(&self) -> Result<StoreStats> {
in_ro_txn(&self.conn, get_store_stats)
}
pub fn get_known_cids<C: FromIterator<Cid>>(&mut self) -> Result<C> {
let res = in_ro_txn(&self.conn, |txn| Ok(get_known_cids::<CidBytes>(txn)?))?;
let res = res.iter().map(Cid::try_from).collect::<cid::Result<C>>()?;
Ok(res)
}
pub fn get_block_cids<C: FromIterator<Cid>>(&self) -> Result<C> {
let res = in_ro_txn(&self.conn, |txn| Ok(get_block_cids::<CidBytes>(txn)?))?;
let res = res.iter().map(Cid::try_from).collect::<cid::Result<C>>()?;
Ok(res)
}
pub fn get_descendants<C: FromIterator<Cid>>(&self, cid: &Cid) -> Result<C> {
let cid = CidBytes::try_from(cid)?;
let res = in_ro_txn(&self.conn, move |txn| get_descendants(txn, cid))?;
let res = res.iter().map(Cid::try_from).collect::<cid::Result<C>>()?;
Ok(res)
}
pub fn get_missing_blocks<C: FromIterator<Cid>>(&self, cid: &Cid) -> Result<C> {
let cid = CidBytes::try_from(cid)?;
let result = log_execution_time("get_missing_blocks", Duration::from_millis(10), || {
in_ro_txn(&self.conn, move |txn| get_missing_blocks(txn, cid))
})?;
let res = result
.iter()
.map(Cid::try_from)
.collect::<cid::Result<C>>()?;
Ok(res)
}
pub fn aliases<C: FromIterator<(Vec<u8>, Cid)>>(&self) -> Result<C> {
let result: Vec<(Vec<u8>, CidBytes)> = in_ro_txn(&self.conn, aliases)?;
let res = result
.into_iter()
.map(|(alias, cid)| {
let cid = Cid::try_from(&cid)?;
Ok((alias, cid))
})
.collect::<cid::Result<C>>()?;
Ok(res)
}
pub fn vacuum(&self) -> Result<()> {
vacuum(&self.conn)
}
pub fn gc(&mut self) -> Result<()> {
loop {
let complete = self.incremental_gc(20000, Duration::from_secs(1))?;
while !self.incremental_delete_orphaned(20000, Duration::from_secs(1))? {}
if complete {
break;
}
}
Ok(())
}
pub fn incremental_gc(&mut self, min_blocks: usize, max_duration: Duration) -> Result<bool> {
let expired_temp_pins = {
let mut result = Vec::new();
std::mem::swap(self.expired_temp_pins.lock().deref_mut(), &mut result);
result
};
let (deleted, complete) = log_execution_time("gc", Duration::from_secs(1), || {
let size_targets = self.config.size_targets;
let cache_tracker = &self.config.cache_tracker;
in_txn(&mut self.conn, move |txn| {
for id in expired_temp_pins {
delete_temp_pin(txn, id)?;
}
Ok(incremental_gc(
&txn,
min_blocks,
max_duration,
size_targets,
cache_tracker,
)?)
})
})?;
self.config.cache_tracker.blocks_deleted(deleted);
Ok(complete)
}
pub fn incremental_delete_orphaned(
&mut self,
min_blocks: usize,
max_duration: Duration,
) -> Result<bool> {
Ok(log_execution_time(
"delete_orphaned",
Duration::from_millis(100),
|| {
in_txn(&mut self.conn, move |txn| {
Ok(incremental_delete_orphaned(txn, min_blocks, max_duration)?)
})
},
)?)
}
pub fn put_blocks<B: Block>(
&mut self,
blocks: impl IntoIterator<Item = B>,
pin: Option<&TempPin>,
) -> Result<()> {
let mut pin0 = pin.map(|pin| pin.id.load(Ordering::SeqCst));
let infos = in_txn(&mut self.conn, |txn| {
Ok(blocks
.into_iter()
.map(|block| {
let cid_bytes = CidBytes::try_from(block.cid())?;
let links = links(&block)?
.iter()
.map(CidBytes::try_from)
.collect::<std::result::Result<FnvHashSet<_>, cid::Error>>()?;
let res = put_block(txn, &cid_bytes, &block.data(), links, &mut pin0)?;
Ok(WriteInfo::new(
BlockInfo::new(res.id, block.cid(), block.data().len()),
res.block_exists,
))
})
.collect::<Result<Vec<_>>>()?)
})?;
if let (Some(pin), Some(p)) = (pin, pin0) {
pin.id.store(p, Ordering::SeqCst);
}
self.config.cache_tracker.blocks_written(infos);
Ok(())
}
pub fn put_block(&mut self, block: &impl Block, alias: Option<&TempPin>) -> Result<()> {
let bb = BorrowedBlock::new(*block.cid(), block.data());
self.put_blocks(Some(bb), alias)?;
Ok(())
}
pub fn get_blocks<I>(&self, cids: I) -> Result<impl Iterator<Item = (Cid, Option<Vec<u8>>)>>
where
I: IntoIterator<Item = Cid>,
{
let res = in_ro_txn(&self.conn, |txn| {
cids.into_iter()
.map(|cid| Ok((cid, get_block(txn, &CidBytes::try_from(&cid)?)?)))
.collect::<crate::Result<Vec<_>>>()
})?;
let infos = res
.iter()
.filter_map(|(cid, res)| {
res.as_ref()
.map(|(id, data)| BlockInfo::new(*id, cid, data.len()))
})
.collect::<Vec<_>>();
self.config.cache_tracker.blocks_accessed(infos);
Ok(res
.into_iter()
.map(|(cid, res)| (cid, res.map(|(_, data)| data))))
}
pub fn get_block(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
Ok(self.get_blocks(std::iter::once(*cid))?.next().unwrap().1)
}
}