pub mod cache;
mod cidbytes;
mod db;
mod error;
#[cfg(test)]
mod tests;
mod transaction;
use cache::{CacheTracker, NoopCacheTracker};
use db::*;
use error::Context;
pub use error::{BlockStoreError, Result};
use libipld::{codec::References, store::StoreParams, Block, Cid, Ipld};
use parking_lot::Mutex;
use rusqlite::{Connection, DatabaseName, OpenFlags};
use std::{
borrow::Cow,
collections::HashSet,
fmt,
iter::FromIterator,
marker::PhantomData,
mem,
ops::DerefMut,
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use tracing::*;
pub use transaction::Transaction;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum DbPath {
File(PathBuf),
Memory,
}
impl DbPath {
fn is_memory(&self) -> bool {
!matches!(self, DbPath::File(_))
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct SizeTargets {
pub count: u64,
pub size: u64,
}
impl SizeTargets {
pub fn new(count: u64, size: u64) -> Self {
Self { count, size }
}
pub fn exceeded(&self, stats: &StoreStats) -> bool {
stats.count > self.count || stats.size > self.size
}
pub fn max_value() -> Self {
Self {
count: u64::max_value(),
size: u64::max_value(),
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum Synchronous {
Full,
Normal,
Off,
}
impl fmt::Display for Synchronous {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
Synchronous::Full => "FULL",
Synchronous::Normal => "NORMAL",
Synchronous::Off => "OFF",
})
}
}
#[derive(Debug, Clone)]
pub struct Config {
size_targets: SizeTargets,
cache_tracker: Arc<dyn CacheTracker>,
pragma_synchronous: Synchronous,
pragma_cache_pages: u64,
read_only: bool,
create: bool,
}
impl Default for Config {
fn default() -> Self {
Self {
size_targets: Default::default(),
cache_tracker: Arc::new(NoopCacheTracker),
pragma_synchronous: Synchronous::Full, pragma_cache_pages: 8192, read_only: false,
create: true,
}
}
}
impl Config {
pub fn with_read_only(mut self, value: bool) -> Self {
self.read_only = value;
self
}
pub fn with_size_targets(mut self, count: u64, size: u64) -> Self {
self.size_targets = SizeTargets { count, size };
self
}
pub fn with_cache_tracker<T: CacheTracker + 'static>(mut self, cache_tracker: T) -> Self {
self.cache_tracker = Arc::new(cache_tracker);
self
}
pub fn with_pragma_synchronous(mut self, value: Synchronous) -> Self {
self.pragma_synchronous = value;
self
}
pub fn with_pragma_cache_pages(mut self, value: u64) -> Self {
self.pragma_cache_pages = value;
self
}
}
pub struct BlockStore<S> {
conn: Connection,
expired_temp_pins: Arc<Mutex<Vec<i64>>>,
config: Config,
db_path: DbPath,
recompute_done: Arc<AtomicBool>,
_s: PhantomData<S>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct StoreStats {
count: u64,
size: u64,
page_size: u64,
used_pages: u64,
free_pages: u64,
}
impl StoreStats {
pub fn count(&self) -> u64 {
self.count
}
pub fn size(&self) -> u64 {
self.size
}
pub fn page_size(&self) -> u64 {
self.page_size
}
pub fn used_pages(&self) -> u64 {
self.used_pages
}
pub fn free_pages(&self) -> u64 {
self.free_pages
}
}
pub struct TempPin {
id: i64,
expired_temp_pins: Arc<Mutex<Vec<i64>>>,
}
impl TempPin {
fn new(expired_temp_pins: Arc<Mutex<Vec<i64>>>) -> Self {
Self {
id: 0,
expired_temp_pins,
}
}
}
impl fmt::Debug for TempPin {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut builder = f.debug_struct("TempAlias");
if self.id > 0 {
builder.field("id", &self.id);
} else {
builder.field("unused", &true);
}
builder.finish()
}
}
impl Drop for TempPin {
fn drop(&mut self) {
if self.id > 0 {
self.expired_temp_pins.lock().push(self.id);
}
}
}
impl<S> BlockStore<S>
where
S: StoreParams,
Ipld: References<S::Codecs>,
{
fn create_connection(db_path: DbPath, config: &Config) -> crate::Result<rusqlite::Connection> {
let mut flags = OpenFlags::SQLITE_OPEN_NO_MUTEX | OpenFlags::SQLITE_OPEN_URI;
flags |= if config.read_only {
OpenFlags::SQLITE_OPEN_READ_ONLY
} else {
OpenFlags::SQLITE_OPEN_READ_WRITE
};
if config.create && !config.read_only {
flags |= OpenFlags::SQLITE_OPEN_CREATE
}
let conn = match db_path {
DbPath::Memory => Connection::open_in_memory().ctx("opening in-memory DB")?,
DbPath::File(path) => Connection::open_with_flags(path, flags).ctx("opening DB")?,
};
Ok(conn)
}
pub fn open_path(db_path: DbPath, config: Config) -> crate::Result<Self> {
let is_memory = db_path.is_memory();
let mut conn = Self::create_connection(db_path.clone(), &config)?;
conn.execute_batch("PRAGMA journal_mode = WAL")
.ctx("setting WAL mode")?;
init_db(
&mut conn,
is_memory,
config.pragma_cache_pages as i64,
config.pragma_synchronous,
)?;
let mut this = Self {
conn,
expired_temp_pins: Arc::new(Mutex::new(Vec::new())),
config,
db_path,
recompute_done: Arc::new(AtomicBool::new(false)),
_s: PhantomData,
};
if !is_memory {
let mut conn = this.additional_connection()?;
std::thread::spawn(move || {
if let Err(e) = recompute_store_stats(&mut conn.conn) {
tracing::error!("cannot recompute store stats: {}", e);
}
conn.recompute_done.store(true, Ordering::SeqCst);
});
} else {
this.recompute_done.store(true, Ordering::SeqCst);
}
if this.config.cache_tracker.has_persistent_state() {
let ids = in_txn(
&mut this.conn,
Some(("get IDs", Duration::from_secs(1))),
false,
get_ids,
)?;
this.config.cache_tracker.retain_ids(&ids);
}
Ok(this)
}
pub fn additional_connection(&self) -> crate::Result<Self> {
if self.db_path.is_memory() {
return Err(BlockStoreError::NoAdditionalInMemory);
}
let mut conn = Self::create_connection(self.db_path.clone(), &self.config)?;
init_pragmas(
&mut conn,
self.db_path.is_memory(),
self.config.pragma_cache_pages as i64,
)?;
conn.pragma_update(
None,
"synchronous",
&self.config.pragma_synchronous.to_string(),
)
.ctx("setting synchronous mode")?;
Ok(Self {
conn,
expired_temp_pins: self.expired_temp_pins.clone(),
config: self.config.clone(),
db_path: self.db_path.clone(),
recompute_done: self.recompute_done.clone(),
_s: PhantomData,
})
}
pub fn memory(config: Config) -> crate::Result<Self> {
Self::open_path(DbPath::Memory, config)
}
pub fn open(path: impl AsRef<Path>, config: Config) -> crate::Result<Self> {
let mut pb: PathBuf = PathBuf::new();
pb.push(path);
Self::open_path(DbPath::File(pb), config)
}
pub fn open_test(path: impl AsRef<Path>, config: Config) -> crate::Result<Self> {
let mut conn = Self::create_connection(DbPath::Memory, &config)?;
debug!(
"Restoring in memory database from {}",
path.as_ref().display()
);
conn.restore(
DatabaseName::Main,
path,
Some(|p: rusqlite::backup::Progress| {
let percent = if p.pagecount == 0 {
100
} else {
(p.pagecount - p.remaining) * 100 / p.pagecount
};
if percent % 10 == 0 {
debug!("Restoring: {} %", percent);
}
}),
)
.ctx("restoring test DB from backup")?;
let ids = in_txn(
&mut conn,
Some(("get ids", Duration::from_secs(1))),
false,
get_ids,
)?;
config.cache_tracker.retain_ids(&ids);
Ok(Self {
conn,
expired_temp_pins: Arc::new(Mutex::new(Vec::new())),
config,
db_path: DbPath::Memory,
recompute_done: Arc::new(AtomicBool::new(true)),
_s: PhantomData,
})
}
pub fn backup(&mut self, path: PathBuf) -> Result<()> {
in_txn(&mut self.conn, None, false, move |txn| {
txn.backup(DatabaseName::Main, path.as_path(), None)
.ctx("backing up DB")
})
}
pub fn flush(&mut self) -> crate::Result<()> {
in_txn(&mut self.conn, None, false, |txn| {
txn.pragma_update(None, "wal_checkpoint", &"TRUNCATE")
.ctx("flushing WAL")
})
}
pub fn integrity_check(&mut self) -> crate::Result<()> {
let result = integrity_check(&mut self.conn)?;
if result == vec!["ok".to_owned()] {
Ok(())
} else {
let error_text = result.join(";");
Err(crate::error::BlockStoreError::SqliteError(
rusqlite::Error::SqliteFailure(rusqlite::ffi::Error::new(11), Some(error_text)),
"checking integrity",
))
}
}
pub fn transaction(&mut self) -> Transaction<'_, S> {
Transaction::new(self)
}
pub fn temp_pin(&self) -> TempPin {
TempPin::new(self.expired_temp_pins.clone())
}
pub fn vacuum(&mut self) -> Result<()> {
vacuum(&mut self.conn)
}
pub fn cleanup_temp_pins(&mut self) -> Result<()> {
let expired_temp_pins = mem::take(self.expired_temp_pins.lock().deref_mut());
in_txn(
&mut self.conn,
Some(("dropping expired temp_pins", Duration::from_millis(100))),
true,
move |txn| {
for id in expired_temp_pins.iter() {
delete_temp_pin(txn, *id)?;
}
Ok(())
},
)
}
pub fn gc(&mut self) -> Result<()> {
self.cleanup_temp_pins()?;
self.flush()?;
incremental_gc(
&mut self.conn,
usize::MAX,
Duration::from_secs(u32::MAX.into()),
self.config.size_targets,
&self.config.cache_tracker,
)?;
self.vacuum()?;
Ok(())
}
fn maybe_checkpoint(&mut self) -> Result<()> {
if self.recompute_done.load(Ordering::SeqCst) {
self.conn
.pragma_update(None, "journal_size_limit", 10_000_000i64)
.ctx("setting journal_size_limit")?;
self.conn
.pragma_update(None, "wal_checkpoint", &"RESTART")
.ctx("running wal_checkpoint(RESTART)")?;
}
Ok(())
}
pub fn incremental_gc(&mut self, min_blocks: usize, max_duration: Duration) -> Result<bool> {
let stats = self.get_store_stats()?;
let _span = tracing::debug_span!("incGC", stats = ?&stats).entered();
self.cleanup_temp_pins()?;
self.maybe_checkpoint()?;
let ret = incremental_gc(
&mut self.conn,
min_blocks,
max_duration,
self.config.size_targets,
&self.config.cache_tracker,
)?;
self.maybe_checkpoint()?;
in_txn(
&mut self.conn,
Some(("incremental_vacuum", Duration::from_millis(500))),
false,
|txn| {
txn.execute_batch("PRAGMA incremental_vacuum")
.ctx("incremental vacuum")
},
)?;
Ok(ret)
}
}
macro_rules! delegate {
($($(#[$attr:meta])*$n:ident$(<$v:ident : $vt:path>)?($($arg:ident : $typ:ty),*) -> $ret:ty;)+) => {
$(
$(#[$attr])*
pub fn $n$(<$v: $vt>)?(&mut self, $($arg: $typ),*) -> $ret {
let mut txn = self.transaction();
let ret = txn.$n($($arg),*)?;
txn.commit()?;
Ok(ret)
}
)+
};
}
impl<S> BlockStore<S>
where
S: StoreParams,
Ipld: References<S::Codecs>,
{
pub fn alias<'b>(
&mut self,
name: impl Into<Cow<'b, [u8]>>,
link: Option<&'b Cid>,
) -> Result<()> {
self.transaction().alias(name, link)
}
pub fn resolve<'b>(&mut self, name: impl Into<Cow<'b, [u8]>>) -> Result<Option<Cid>> {
self.transaction().resolve(name)
}
delegate! {
reverse_alias(cid: &Cid) -> Result<Option<HashSet<Vec<u8>>>>;
extend_temp_pin(pin: &mut TempPin, link: &Cid) -> Result<()>;
has_cid(cid: &Cid) -> Result<bool>;
has_block(cid: &Cid) -> Result<bool>;
get_known_cids<C: FromIterator<Cid>>() -> Result<C>;
get_block_cids<C: FromIterator<Cid>>() -> Result<C>;
get_descendants<C: FromIterator<Cid>>(cid: &Cid) -> Result<C>;
get_missing_blocks<C: FromIterator<Cid>>(cid: &Cid) -> Result<C>;
aliases<C: FromIterator<(Vec<u8>, Cid)>>() -> Result<C>;
put_block(block: Block<S>, pin: Option<&mut TempPin>) -> Result<()>;
get_block(cid: &Cid) -> Result<Option<Vec<u8>>>;
get_store_stats() -> Result<StoreStats>;
}
pub fn put_blocks<I>(&mut self, blocks: I, mut pin: Option<&mut TempPin>) -> Result<()>
where
I: IntoIterator<Item = Block<S>>,
{
let mut txn = self.transaction();
for block in blocks {
#[allow(clippy::needless_option_as_deref)]
txn.put_block(block, pin.as_deref_mut())?;
}
txn.commit()
}
}