ipfs-sqlite-block-store 0.3.1

block store for ipfs, using sqlite
Documentation
//! # IPFS sqlite block store
//!
//! A block store for a rust implementation of [ipfs](https://ipfs.io/).
//!
//! # Concepts
//!
//! ## Aliases
//!
//! An alias is a named pin of a root. When a root is aliased, none of the leaves of the dag pointed
//! to by the root will be collected by gc. However, a root being aliased does not mean that the dag
//! must be complete.
//!
//! ## Temporary aliases
//!
//! A temporary alias is an unnamed alias that is just for the purpose of protecting blocks from gc
//! while a large tree is being constructed. While an alias maps a single name to a single root, a
//! temporary alias can be assigned to an arbitary number of blocks before the dag is finished.
//!
//! A temporary alias will be deleted as soon as the handle goes out of scope.
//!
//! ## Garbage Collection (GC)
//!
//! GC refers to the process of removing unpinned blocks. It runs only when the configured size
//! targets are exceeded. [Size targets](SizeTargets) contain both the total size of the store
//! and the number of blocks.
//!
//! GC will run incrementally, deleting blocks until the size targets are no longer exceeded. The
//! order in which unpinned blocks will be deleted can be customized.
//!
//! ## Caching
//!
//! For unpinned blocks, it is possible to customize which blocks have the highest value using a
//! [CacheTracker](cache::CacheTracker). The default is to [do nothing](cache::NoopCacheTracker)
//! and has no performance overhead.
//!
//! The most elaborate implemented strategy is to keep track of access times in a separate database,
//! via the [SqliteCacheTracker](cache::SqliteCacheTracker), which has a slight performance overhead.
//!
//! The performance overhead of writing to an access tracking database on each block read can be
//! mitigated by using the [AsyncCacheTracker](cache::AsyncCacheTracker) wrapper to perform the database
//! writes on a different thread.
//!
//! # Usage
//!
//! ## Blocking
//!
//! For blocking usage, use [BlockStore](BlockStore). This is the most low level interface.
//!
//! ## Non-blocking
//!
//! For non-blocking usage, use [AsyncBlockStore](async_block_store::AsyncBlockStore). This is a
//! wrapper that is meant to be used from async rust. In addition to wrapping most methods of
//! [BlockStore], it provides a method [gc_loop](async_block_store::AsyncBlockStore::gc_loop) to
//! run gc continuously.
//!
//! # Major differences to the go-ipfs pinning concept
//!
//! - Pinning/aliasing a root does not require that the dag is complete
//! - Aliases/named pins as opposed to unnamed and non-reference-counted pins
//! - Temporary pins as a mechanism to keep blocks safe from gc while a tree is being constructed
pub mod cache;
mod cidbytes;
mod db;
mod error;
#[cfg(test)]
mod tests;

use crate::cidbytes::CidBytes;
use cache::{BlockInfo, CacheTracker, NoopCacheTracker, WriteInfo};
use db::*;
pub use error::{BlockStoreError, Result};
use fnv::FnvHashSet;
use libipld::{
    cid::{self, Cid},
    codec::Codec,
    store::StoreParams,
    Ipld, IpldCodec,
};
use parking_lot::Mutex;
use rusqlite::{Connection, DatabaseName, OpenFlags};
use std::{
    convert::TryFrom,
    fmt,
    iter::FromIterator,
    ops::DerefMut,
    path::{Path, PathBuf},
    sync::{
        atomic::{AtomicI64, Ordering},
        Arc,
    },
    time::Duration,
};
use tracing::*;

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum DbPath {
    File(PathBuf),
    Memory,
}

impl DbPath {
    fn is_memory(&self) -> bool {
        !matches!(self, DbPath::File(_))
    }
}

/// Size targets for a store. Gc of non-pinned blocks will start once one of the size targets is exceeded.
///
/// There are targets for both block count and block size. The reason for this is that a store that has
/// a very large number of tiny blocks will become sluggish despite not having a large total size.
///
/// Size targets only apply to non-pinned blocks. Pinned blocks will never be gced even if exceeding one of the
/// size targets.
#[derive(Debug, Clone, Copy, Default)]
pub struct SizeTargets {
    /// target number of blocks.
    ///
    /// Up to this number, the store will retain everything even if not pinned.
    /// Once this number is exceeded, the store will run garbage collection of all
    /// unpinned blocks until the block criterion is met again.
    ///
    /// To completely disable storing of non-pinned blocks, set this to 0.
    /// Even then, the store will never delete pinned blocks.
    pub count: u64,

    /// target store size.
    ///
    /// Up to this size, the store will retain everything even if not pinned.
    /// Once this size is exceeded, the store will run garbage collection of all
    /// unpinned blocks until the size criterion is met again.
    ///
    /// The store will never delete pinned blocks.
    pub size: u64,
}

impl SizeTargets {
    pub fn new(count: u64, size: u64) -> Self {
        Self { count, size }
    }

    pub fn exceeded(&self, stats: &StoreStats) -> bool {
        stats.count > self.count || stats.size > self.size
    }

    /// Size targets that can not be reached. This can be used to disable gc.
    pub fn max_value() -> Self {
        Self {
            count: u64::max_value(),
            size: u64::max_value(),
        }
    }
}

#[derive(Debug, Clone, Copy)]
pub enum Synchronous {
    // this is the most conservative mode. This only works if we have few, large transactions
    Full,
    Normal,
    Off,
}

impl fmt::Display for Synchronous {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str(match self {
            Synchronous::Full => "FULL",
            Synchronous::Normal => "NORMAL",
            Synchronous::Off => "OFF",
        })
    }
}

#[derive(Debug)]
pub struct Config {
    size_targets: SizeTargets,
    cache_tracker: Box<dyn CacheTracker>,
    pragma_synchronous: Synchronous,
    pragma_cache_pages: u64,
    // open in readonly mode
    read_only: bool,
    // create if it does not yet exist
    create: bool,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            size_targets: Default::default(),
            cache_tracker: Box::new(NoopCacheTracker),
            pragma_synchronous: Synchronous::Full, // most conservative setting
            pragma_cache_pages: 8192, // 32 megabytes with the default page size of 4096
            read_only: false,
            create: true,
        }
    }
}

impl Config {
    pub fn with_read_only(mut self, value: bool) -> Self {
        self.read_only = value;
        self
    }
    /// Set size targets for the store
    pub fn with_size_targets(mut self, size_targets: SizeTargets) -> Self {
        self.size_targets = size_targets;
        self
    }
    /// Set strategy for which non-pinned blocks to keep in case one of the size targets is exceeded.
    pub fn with_cache_tracker<T: CacheTracker + 'static>(mut self, cache_tracker: T) -> Self {
        self.cache_tracker = Box::new(cache_tracker);
        self
    }
    pub fn with_pragma_synchronous(mut self, value: Synchronous) -> Self {
        self.pragma_synchronous = value;
        self
    }
    pub fn with_pragma_cache_pages(mut self, value: u64) -> Self {
        self.pragma_cache_pages = value;
        self
    }
}

pub struct BlockStore {
    conn: Connection,
    expired_temp_pins: Arc<Mutex<Vec<i64>>>,
    config: Config,
}

#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct StoreStats {
    count: u64,
    size: u64,
}

impl StoreStats {
    /// Total number of blocks in the store
    pub fn count(&self) -> u64 {
        self.count
    }

    /// Total size of blocks in the store
    pub fn size(&self) -> u64 {
        self.size
    }
}

// do not implement Clone for this!
/// a handle that contains a temporary pin
///
/// dropping this handle enqueue the pin for dropping before the next gc.
pub struct TempPin {
    id: AtomicI64,
    expired_temp_pins: Arc<Mutex<Vec<i64>>>,
}

/// dump the temp alias id so you can find it in the database
impl fmt::Debug for TempPin {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let id = self.id.load(Ordering::SeqCst);
        let mut builder = f.debug_struct("TempAlias");
        if id > 0 {
            builder.field("id", &id);
        }
        builder.finish()
    }
}

impl Drop for TempPin {
    fn drop(&mut self) {
        let id = self.id.get_mut();
        let alias = *id;
        if alias > 0 {
            // not sure if we have to guard against double drop, but it certainly does not hurt.
            *id = 0;
            self.expired_temp_pins.lock().push(alias);
        }
    }
}

/// An ipfs block
pub trait Block {
    fn cid(&self) -> &Cid;
    fn data(&self) -> &[u8];
}

impl<S: StoreParams> Block for libipld::Block<S> {
    fn cid(&self) -> &Cid {
        libipld::Block::cid(&self)
    }

    fn data(&self) -> &[u8] {
        libipld::Block::data(&self)
    }
}

/// Block that owns its data
#[derive(Debug, Clone)]
pub struct OwnedBlock {
    cid: Cid,
    data: Box<[u8]>,
}

impl OwnedBlock {
    pub fn new(cid: Cid, data: impl Into<Box<[u8]>>) -> Self {
        Self {
            cid,
            data: data.into(),
        }
    }
}

impl Block for OwnedBlock {
    fn cid(&self) -> &Cid {
        &self.cid
    }

    fn data(&self) -> &[u8] {
        &self.data
    }
}

struct BorrowedBlock<'a> {
    cid: Cid,
    data: &'a [u8],
}

impl<'a> BorrowedBlock<'a> {
    fn new(cid: Cid, data: &'a [u8]) -> Self {
        Self { cid, data }
    }
}

impl<'a> Block for BorrowedBlock<'a> {
    fn cid(&self) -> &Cid {
        &self.cid
    }

    fn data(&self) -> &[u8] {
        self.data
    }
}

fn links(block: &impl Block) -> anyhow::Result<Vec<Cid>> {
    let mut links = Vec::new();
    IpldCodec::try_from(block.cid().codec())?
        .references::<Ipld, Vec<_>>(&block.data(), &mut links)?;
    Ok(links)
}

impl BlockStore {
    fn create_connection(db_path: DbPath, config: &Config) -> crate::Result<rusqlite::Connection> {
        let mut flags = OpenFlags::SQLITE_OPEN_NO_MUTEX | OpenFlags::SQLITE_OPEN_URI;
        flags |= if config.read_only {
            OpenFlags::SQLITE_OPEN_READ_ONLY
        } else {
            OpenFlags::SQLITE_OPEN_READ_WRITE
        };
        if config.create && !config.read_only {
            flags |= OpenFlags::SQLITE_OPEN_CREATE
        }
        let conn = match db_path {
            DbPath::Memory => Connection::open_in_memory()?,
            DbPath::File(path) => Connection::open_with_flags(path, flags)?,
        };
        Ok(conn)
    }

    pub fn open_path(db_path: DbPath, config: Config) -> crate::Result<Self> {
        let is_memory = db_path.is_memory();
        let mut conn = Self::create_connection(db_path, &config)?;
        init_db(
            &mut conn,
            is_memory,
            config.pragma_cache_pages as i64,
            config.pragma_synchronous,
        )?;
        let ids = in_txn(&mut conn, |txn| get_ids(txn))?;
        config.cache_tracker.retain_ids(&ids);
        Ok(Self {
            conn,
            expired_temp_pins: Arc::new(Mutex::new(Vec::new())),
            config,
        })
    }

    /// Create an in memory block store with the given config
    pub fn memory(config: Config) -> crate::Result<Self> {
        Self::open_path(DbPath::Memory, config)
    }

    /// Create a persistent block store with the given config
    pub fn open(path: impl AsRef<Path>, config: Config) -> crate::Result<Self> {
        let mut pb: PathBuf = PathBuf::new();
        pb.push(path);
        Self::open_path(DbPath::File(pb), config)
    }

    /// Open the file at the given path for testing.
    ///
    /// This will create a writeable in-memory database that is initialized with the content
    /// of the file at the given path.
    pub fn open_test(path: impl AsRef<Path>, config: Config) -> crate::Result<Self> {
        let mut conn = Self::create_connection(DbPath::Memory, &config)?;
        debug!(
            "Restoring in memory database from {}",
            path.as_ref().display()
        );
        conn.restore(
            DatabaseName::Main,
            path,
            Some(|p: rusqlite::backup::Progress| {
                let percent = if p.pagecount == 0 {
                    100
                } else {
                    (p.pagecount - p.remaining) * 100 / p.pagecount
                };
                if percent % 10 == 0 {
                    debug!("Restoring: {} %", percent);
                }
            }),
        )?;
        let ids = in_txn(&mut conn, |txn| get_ids(txn))?;
        config.cache_tracker.retain_ids(&ids);
        Ok(Self {
            conn,
            expired_temp_pins: Arc::new(Mutex::new(Vec::new())),
            config,
        })
    }

    pub fn flush(&self) -> crate::Result<()> {
        // TODO: check if this works! We are always in WAL mode.
        // https://www.sqlite.org/pragma.html#pragma_wal_checkpoint
        Ok(self.conn.pragma_update(None, "wal_checkpoint", &"FULL")?)
    }

    pub fn integrity_check(&self) -> crate::Result<()> {
        let result = integrity_check(&self.conn)?;
        if result == vec!["ok".to_owned()] {
            Ok(())
        } else {
            let error_text = result.join(";");
            Err(crate::error::BlockStoreError::SqliteError(
                rusqlite::Error::SqliteFailure(rusqlite::ffi::Error::new(11), Some(error_text)),
            ))
        }
    }

    /// Get a temporary alias for safely adding blocks to the store
    pub fn temp_pin(&self) -> TempPin {
        TempPin {
            id: AtomicI64::new(0),
            expired_temp_pins: self.expired_temp_pins.clone(),
        }
    }

    /// Add a permanent named alias/pin for a root
    pub fn alias(&mut self, name: impl AsRef<[u8]>, link: Option<&Cid>) -> crate::Result<()> {
        self.alias_many(std::iter::once((name, link.cloned())))
    }

    /// Resolves an alias to a cid.
    pub fn resolve(&self, name: impl AsRef<[u8]>) -> crate::Result<Option<Cid>> {
        in_ro_txn(&self.conn, |txn| {
            Ok(resolve::<CidBytes>(txn, name.as_ref())?
                .map(|c| Cid::try_from(&c))
                .transpose()?)
        })
    }

    /// Add multiple permanent named aliases
    pub fn alias_many(
        &mut self,
        aliases: impl IntoIterator<Item = (impl AsRef<[u8]>, Option<Cid>)>,
    ) -> crate::Result<()> {
        in_txn(&mut self.conn, |txn| {
            for (name, link) in aliases.into_iter() {
                let link: Option<CidBytes> = link.map(|x| CidBytes::try_from(&x)).transpose()?;
                alias(txn, name.as_ref(), link.as_ref())?;
            }
            Ok(())
        })
    }

    pub fn extend_temp_pin(
        &mut self,
        pin: &TempPin,
        links: impl IntoIterator<Item = Cid>,
    ) -> crate::Result<()> {
        let pin0 = pin.id.load(Ordering::SeqCst);
        let pin0 = in_txn(&mut self.conn, |txn| {
            let links = links
                .into_iter()
                .map(|x| CidBytes::try_from(&x))
                .collect::<std::result::Result<Vec<_>, cid::Error>>()?;
            extend_temp_pin(txn, pin0, links)
        })?;
        pin.id.store(pin0, Ordering::SeqCst);
        Ok(())
    }

    /// Returns the aliases referencing a block.
    pub fn reverse_alias(&self, cid: &Cid) -> crate::Result<Option<Vec<Vec<u8>>>> {
        let cid = CidBytes::try_from(cid)?;
        in_ro_txn(&self.conn, |txn| reverse_alias(txn, cid.as_ref()))
    }

    /// Checks if the store knows about the cid.
    /// Note that this does not necessarily mean that the store has the data for the cid.
    pub fn has_cid(&self, cid: &Cid) -> Result<bool> {
        let cid = CidBytes::try_from(cid)?;
        in_ro_txn(&self.conn, |txn| has_cid(txn, cid))
    }

    /// Checks if the store has the data for a cid
    pub fn has_block(&mut self, cid: &Cid) -> Result<bool> {
        let cid = CidBytes::try_from(cid)?;
        in_ro_txn(&self.conn, |txn| has_block(txn, cid))
    }

    /// Look up multiple blocks in one read transaction
    pub fn has_blocks<I, O>(&self, cids: I) -> Result<O>
    where
        I: IntoIterator<Item = Cid>,
        O: FromIterator<(Cid, bool)>,
    {
        in_ro_txn(&self.conn, |txn| {
            cids.into_iter()
                .map(|cid| -> Result<(Cid, bool)> {
                    Ok((cid, has_block(txn, CidBytes::try_from(&cid)?)?))
                })
                .collect::<crate::Result<O>>()
        })
    }

    /// Get the stats for the store.
    ///
    /// The stats are kept up to date, so this is fast.
    pub fn get_store_stats(&self) -> Result<StoreStats> {
        in_ro_txn(&self.conn, get_store_stats)
    }

    /// Get all cids that the store knows about
    pub fn get_known_cids<C: FromIterator<Cid>>(&mut self) -> Result<C> {
        let res = in_ro_txn(&self.conn, |txn| Ok(get_known_cids::<CidBytes>(txn)?))?;
        let res = res.iter().map(Cid::try_from).collect::<cid::Result<C>>()?;
        Ok(res)
    }

    /// Get all cids for which the store has blocks
    pub fn get_block_cids<C: FromIterator<Cid>>(&self) -> Result<C> {
        let res = in_ro_txn(&self.conn, |txn| Ok(get_block_cids::<CidBytes>(txn)?))?;
        let res = res.iter().map(Cid::try_from).collect::<cid::Result<C>>()?;
        Ok(res)
    }

    /// Get descendants of a cid
    pub fn get_descendants<C: FromIterator<Cid>>(&self, cid: &Cid) -> Result<C> {
        let cid = CidBytes::try_from(cid)?;
        let res = in_ro_txn(&self.conn, move |txn| get_descendants(txn, cid))?;
        let res = res.iter().map(Cid::try_from).collect::<cid::Result<C>>()?;
        Ok(res)
    }

    /// Given a root of a dag, gives all cids which we do not have data for.
    pub fn get_missing_blocks<C: FromIterator<Cid>>(&self, cid: &Cid) -> Result<C> {
        let cid = CidBytes::try_from(cid)?;
        let result = log_execution_time("get_missing_blocks", Duration::from_millis(10), || {
            in_ro_txn(&self.conn, move |txn| get_missing_blocks(txn, cid))
        })?;
        let res = result
            .iter()
            .map(Cid::try_from)
            .collect::<cid::Result<C>>()?;
        Ok(res)
    }

    /// list all aliases
    pub fn aliases<C: FromIterator<(Vec<u8>, Cid)>>(&self) -> Result<C> {
        let result: Vec<(Vec<u8>, CidBytes)> = in_ro_txn(&self.conn, aliases)?;
        let res = result
            .into_iter()
            .map(|(alias, cid)| {
                let cid = Cid::try_from(&cid)?;
                Ok((alias, cid))
            })
            .collect::<cid::Result<C>>()?;
        Ok(res)
    }

    pub fn vacuum(&self) -> Result<()> {
        vacuum(&self.conn)
    }

    /// do a full garbage collection
    ///
    /// for a large block store, this can take several seconds to minutes. If that is not acceptable,
    /// consider using incremental gc.
    pub fn gc(&mut self) -> Result<()> {
        loop {
            let complete = self.incremental_gc(20000, Duration::from_secs(1))?;
            while !self.incremental_delete_orphaned(20000, Duration::from_secs(1))? {}
            if complete {
                break;
            }
        }
        Ok(())
    }
    /// Perform an incremental garbage collection.
    ///
    /// Will collect unpinned blocks until either the size targets are met again, or at minimum
    /// `min_blocks` blocks are collected. Then it will continue connecting blocks until `max_duration`
    /// is elapsed.
    ///
    /// Note that this might significantly exceed `max_duration` for various reasons. Also note that
    /// when doing incremental gc, the actual blocks are not yet deleted. So a call to this method
    /// should usually be followed by a call to incremental_delete_orphaned.
    ///
    /// - `min_blocks` the minium number of blocks to collect in any case
    /// - `max_duration` the maximum duration that should be spent on gc
    ///
    /// Returns true if either size targets are met or there are no unpinned blocks left.
    pub fn incremental_gc(&mut self, min_blocks: usize, max_duration: Duration) -> Result<bool> {
        // atomically grab the expired_temp_pins until now
        let expired_temp_pins = {
            let mut result = Vec::new();
            std::mem::swap(self.expired_temp_pins.lock().deref_mut(), &mut result);
            result
        };
        let (deleted, complete) = log_execution_time("gc", Duration::from_secs(1), || {
            let size_targets = self.config.size_targets;
            let cache_tracker = &self.config.cache_tracker;
            in_txn(&mut self.conn, move |txn| {
                // get rid of dropped temp aliases, this should be fast
                for id in expired_temp_pins {
                    delete_temp_pin(txn, id)?;
                }
                Ok(incremental_gc(
                    &txn,
                    min_blocks,
                    max_duration,
                    size_targets,
                    cache_tracker,
                )?)
            })
        })?;
        self.config.cache_tracker.blocks_deleted(deleted);
        Ok(complete)
    }
    /// Incrementally delete orphaned blocks
    ///
    /// Orphaned blocks are blocks for which we have deleted the metadata in `incremental_gc`.
    ///
    /// Will delete orphaned blocks until either all orphaned blocks are deleted, or at minimum
    /// `min_blocks` blocks are deleted. Then it will continue deleting blocks until `max_duration`
    /// is elapsed.
    ///
    /// Note that this might significantly exceed `max_duration` for various reasons.
    ///
    /// - `min_blocks` the minium number of blocks to delete in any case
    /// - `max_duration` the maximum duration that should be spent on gc
    ///
    /// Returns true if all orphaned blocks are deleted
    pub fn incremental_delete_orphaned(
        &mut self,
        min_blocks: usize,
        max_duration: Duration,
    ) -> Result<bool> {
        Ok(log_execution_time(
            "delete_orphaned",
            Duration::from_millis(100),
            || {
                in_txn(&mut self.conn, move |txn| {
                    Ok(incremental_delete_orphaned(txn, min_blocks, max_duration)?)
                })
            },
        )?)
    }
    /// Add a number of blocks to the store
    ///
    /// It is up to the caller to extract links from blocks. Also, the store does not know
    /// anything about content-addressing and will not validate that the cid of a block is the
    /// actual hash of the content.
    ///
    /// - `blocks` the blocks to add.
    ///   Even we already have these blocks, the alias will be set. However, it will not be checked
    ///   that the links or data are the same as last time the block was added. That is responsibility
    ///   of the caller.
    /// - `alias` an optional temporary alias.
    ///   This can be used to incrementally add blocks without having to worry about them being garbage
    ///   collected before they can be pinned with a permanent alias.
    pub fn put_blocks<B: Block>(
        &mut self,
        blocks: impl IntoIterator<Item = B>,
        pin: Option<&TempPin>,
    ) -> Result<()> {
        let mut pin0 = pin.map(|pin| pin.id.load(Ordering::SeqCst));
        let infos = in_txn(&mut self.conn, |txn| {
            Ok(blocks
                .into_iter()
                .map(|block| {
                    let cid_bytes = CidBytes::try_from(block.cid())?;
                    let links = links(&block)?
                        .iter()
                        .map(CidBytes::try_from)
                        .collect::<std::result::Result<FnvHashSet<_>, cid::Error>>()?;
                    let res = put_block(txn, &cid_bytes, &block.data(), links, &mut pin0)?;
                    Ok(WriteInfo::new(
                        BlockInfo::new(res.id, block.cid(), block.data().len()),
                        res.block_exists,
                    ))
                })
                .collect::<Result<Vec<_>>>()?)
        })?;
        if let (Some(pin), Some(p)) = (pin, pin0) {
            pin.id.store(p, Ordering::SeqCst);
        }
        self.config.cache_tracker.blocks_written(infos);
        Ok(())
    }
    /// Add a single block
    ///
    /// this is just a convenience method that calls put_blocks internally.
    ///
    /// - `cid` the cid
    ///   This should be a hash of the data, with some format specifier.
    /// - `data` a blob
    /// - `links` links extracted from the data
    /// - `alias` an optional temporary alias
    pub fn put_block(&mut self, block: &impl Block, alias: Option<&TempPin>) -> Result<()> {
        let bb = BorrowedBlock::new(*block.cid(), block.data());
        self.put_blocks(Some(bb), alias)?;
        Ok(())
    }
    /// Get multiple blocks in a single read transaction
    pub fn get_blocks<I>(&self, cids: I) -> Result<impl Iterator<Item = (Cid, Option<Vec<u8>>)>>
    where
        I: IntoIterator<Item = Cid>,
    {
        let res = in_ro_txn(&self.conn, |txn| {
            cids.into_iter()
                .map(|cid| Ok((cid, get_block(txn, &CidBytes::try_from(&cid)?)?)))
                .collect::<crate::Result<Vec<_>>>()
        })?;
        let infos = res
            .iter()
            .filter_map(|(cid, res)| {
                res.as_ref()
                    .map(|(id, data)| BlockInfo::new(*id, cid, data.len()))
            })
            .collect::<Vec<_>>();
        self.config.cache_tracker.blocks_accessed(infos);
        Ok(res
            .into_iter()
            .map(|(cid, res)| (cid, res.map(|(_, data)| data))))
    }
    /// Get data for a block
    ///
    /// Will return None if we don't have the data
    pub fn get_block(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
        Ok(self.get_blocks(std::iter::once(*cid))?.next().unwrap().1)
    }

    /// the underlying rusqlite connection
    pub fn connection(&self) -> &Connection {
        &self.conn
    }

    /// the underlying rusqlite connection as a mutable reference
    pub fn connection_mut(&mut self) -> &mut Connection {
        &mut self.conn
    }
}