use crate::error::Error;
use crate::path::IpfsPath;
use crate::{Block, StoragePath};
use anyhow::anyhow;
use async_trait::async_trait;
use core::fmt::Debug;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::future::BoxFuture;
use futures::sink::SinkExt;
use futures::stream::{BoxStream, FuturesOrdered};
use futures::{FutureExt, StreamExt, TryStreamExt};
use libipld::cid::Cid;
use libipld::{Ipld, IpldCodec};
use libp2p::identity::PeerId;
use parking_lot::{Mutex, RwLock};
use std::borrow::Borrow;
use std::collections::{BTreeSet, HashMap};
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::{error, fmt, io};
use tracing::log;
#[macro_use]
#[cfg(test)]
mod common_tests;
pub mod blockstore;
pub mod datastore;
pub mod lock;
pub(crate) mod paths;
#[derive(Debug, PartialEq, Eq)]
pub enum BlockPut {
    NewBlock,
    Existed,
}
#[derive(Debug)]
pub enum BlockRm {
    Removed(Cid),
    }
#[derive(Debug)]
pub enum BlockRmError {
    NotFound(Cid),
}
#[async_trait]
pub trait BlockStore: Debug + Send + Sync + 'static {
    async fn init(&self) -> Result<(), Error>;
    async fn open(&self) -> Result<(), Error>;
    async fn contains(&self, cid: &Cid) -> Result<bool, Error>;
    async fn get(&self, cid: &Cid) -> Result<Option<Block>, Error>;
    async fn size(&self, cid: &[Cid]) -> Result<Option<usize>, Error>;
    async fn total_size(&self) -> Result<usize, Error>;
    async fn put(&self, block: Block) -> Result<(Cid, BlockPut), Error>;
    async fn remove(&self, cid: &Cid) -> Result<Result<BlockRm, BlockRmError>, Error>;
    async fn remove_garbage(&self, references: BoxStream<'static, Cid>) -> Result<Vec<Cid>, Error>;
    async fn list(&self) -> Result<Vec<Cid>, Error>;
    async fn wipe(&self) {}
}
#[async_trait]
pub trait DataStore: PinStore + Debug + Send + Sync + 'static {
    async fn init(&self) -> Result<(), Error>;
    async fn open(&self) -> Result<(), Error>;
    async fn contains(&self, key: &[u8]) -> Result<bool, Error>;
    async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error>;
    async fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error>;
    async fn remove(&self, key: &[u8]) -> Result<(), Error>;
    async fn iter(&self) -> futures::stream::BoxStream<'static, (Vec<u8>, Vec<u8>)>;
    async fn wipe(&self) {}
}
#[derive(Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord)]
pub struct GCConfig {
    pub duration: Duration,
    pub trigger: GCTrigger,
}
#[derive(Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord)]
pub enum GCTrigger {
    At {
        size: usize,
    },
    AtStorage,
    #[default]
    None,
}
#[derive(Debug)]
pub enum LockError {
    RepoInUse,
    LockFileOpenFailed(io::Error),
}
impl fmt::Display for LockError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let msg = match self {
            LockError::RepoInUse => "The repository is already being used by an IPFS instance.",
            LockError::LockFileOpenFailed(_) => "Failed to open repository lock file.",
        };
        write!(f, "{msg}")
    }
}
impl From<io::Error> for LockError {
    fn from(error: io::Error) -> Self {
        match error.kind() {
            io::ErrorKind::WouldBlock => LockError::RepoInUse,
            _ => LockError::LockFileOpenFailed(error),
        }
    }
}
impl error::Error for LockError {
    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
        if let Self::LockFileOpenFailed(error) = self {
            Some(error)
        } else {
            None
        }
    }
}
pub trait Lock: Debug + Send + Sync + 'static {
    fn try_exclusive(&self) -> Result<(), LockError>;
}
type References<'a> = futures::stream::BoxStream<'a, Result<Cid, crate::refs::IpldRefsError>>;
#[async_trait]
pub trait PinStore: Debug + Send + Sync + Unpin + 'static {
    async fn is_pinned(&self, block: &Cid) -> Result<bool, Error>;
    async fn insert_direct_pin(&self, target: &Cid) -> Result<(), Error>;
    async fn insert_recursive_pin(
        &self,
        target: &Cid,
        referenced: References<'_>,
    ) -> Result<(), Error>;
    async fn remove_direct_pin(&self, target: &Cid) -> Result<(), Error>;
    async fn remove_recursive_pin(
        &self,
        target: &Cid,
        referenced: References<'_>,
    ) -> Result<(), Error>;
    async fn list(
        &self,
        mode: Option<PinMode>,
    ) -> futures::stream::BoxStream<'static, Result<(Cid, PinMode), Error>>;
    async fn query(
        &self,
        ids: Vec<Cid>,
        requirement: Option<PinMode>,
    ) -> Result<Vec<(Cid, PinKind<Cid>)>, Error>;
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum PinMode {
    Indirect,
    Direct,
    Recursive,
}
#[derive(Debug, Clone, Copy)]
enum PinModeRequirement {
    Only(PinMode),
    Any,
}
impl From<Option<PinMode>> for PinModeRequirement {
    fn from(filter: Option<PinMode>) -> Self {
        match filter {
            Some(one) => PinModeRequirement::Only(one),
            None => PinModeRequirement::Any,
        }
    }
}
impl PinModeRequirement {
    fn is_indirect_or_any(&self) -> bool {
        use PinModeRequirement::*;
        match self {
            Only(PinMode::Indirect) | Any => true,
            Only(_) => false,
        }
    }
    fn matches<P: PartialEq<PinMode>>(&self, other: &P) -> bool {
        use PinModeRequirement::*;
        match self {
            Only(one) if other == one => true,
            Only(_) => false,
            Any => true,
        }
    }
    fn required(&self) -> Option<PinMode> {
        use PinModeRequirement::*;
        match self {
            Only(one) => Some(*one),
            Any => None,
        }
    }
}
impl<B: Borrow<Cid>> PartialEq<PinMode> for PinKind<B> {
    fn eq(&self, other: &PinMode) -> bool {
        matches!(
            (self, other),
            (PinKind::IndirectFrom(_), PinMode::Indirect)
                | (PinKind::Direct, PinMode::Direct)
                | (PinKind::Recursive(_), PinMode::Recursive)
                | (PinKind::RecursiveIntention, PinMode::Recursive)
        )
    }
}
#[derive(Debug, PartialEq, Eq)]
pub enum PinKind<C: Borrow<Cid>> {
    IndirectFrom(C),
    Direct,
    Recursive(u64),
    RecursiveIntention,
}
impl<C: Borrow<Cid>> PinKind<C> {
    fn as_ref(&self) -> PinKind<&'_ Cid> {
        use PinKind::*;
        match self {
            IndirectFrom(c) => PinKind::IndirectFrom(c.borrow()),
            Direct => PinKind::Direct,
            Recursive(count) => PinKind::Recursive(*count),
            RecursiveIntention => PinKind::RecursiveIntention,
        }
    }
}
#[allow(clippy::type_complexity)]
#[derive(Debug, Clone)]
pub struct Repo {
    online: Arc<AtomicBool>,
    initialized: Arc<AtomicBool>,
    max_storage_size: Arc<AtomicUsize>,
    block_store: Arc<dyn BlockStore>,
    data_store: Arc<dyn DataStore>,
    events: Arc<RwLock<Option<Sender<RepoEvent>>>>,
    pub(crate) subscriptions:
        Arc<Mutex<HashMap<Cid, Vec<futures::channel::oneshot::Sender<Result<Block, String>>>>>>,
    lockfile: Arc<dyn Lock>,
    gclock: Arc<tokio::sync::RwLock<()>>,
}
#[async_trait]
impl beetle_bitswap_next::Store for Repo {
    async fn get_size(&self, cid: &Cid) -> anyhow::Result<usize> {
        self.get_block_now(cid)
            .await?
            .ok_or(anyhow::anyhow!("Block doesnt exist"))
            .map(|block| block.data().len())
    }
    async fn get(&self, cid: &Cid) -> anyhow::Result<beetle_bitswap_next::Block> {
        let block = self
            .get_block_now(cid)
            .await?
            .ok_or(anyhow::anyhow!("Block doesnt exist"))?;
        Ok(beetle_bitswap_next::Block {
            cid: *block.cid(),
            data: bytes::Bytes::copy_from_slice(block.data()),
        })
    }
    async fn has(&self, cid: &Cid) -> anyhow::Result<bool> {
        self.contains(cid).await
    }
}
#[derive(Debug)]
pub enum RepoEvent {
    WantBlock(Option<u64>, Vec<Cid>, Vec<PeerId>),
    UnwantBlock(Cid),
    NewBlock(Block),
    RemovedBlock(Cid),
}
impl Repo {
    pub fn new(repo_type: StoragePath, duration: Option<Duration>) -> Self {
        match repo_type {
            StoragePath::Memory => Repo::new_memory(duration),
            StoragePath::Disk(path) => Repo::new_fs(path, duration),
            StoragePath::Custom {
                blockstore,
                datastore,
                lock,
            } => Repo::new_raw(blockstore, datastore, lock),
        }
    }
    pub fn new_raw(
        block_store: Arc<dyn BlockStore>,
        data_store: Arc<dyn DataStore>,
        lockfile: Arc<dyn Lock>,
    ) -> Self {
        Repo {
            initialized: Arc::default(),
            online: Arc::default(),
            block_store,
            data_store,
            events: Arc::default(),
            subscriptions: Default::default(),
            lockfile,
            max_storage_size: Arc::new(AtomicUsize::new(0)),
            gclock: Arc::default(),
        }
    }
    pub fn new_fs(path: impl AsRef<Path>, duration: Option<Duration>) -> Self {
        let duration = duration.unwrap_or(Duration::from_secs(60 * 2));
        let path = path.as_ref().to_path_buf();
        let mut blockstore_path = path.clone();
        let mut datastore_path = path.clone();
        let mut lockfile_path = path;
        blockstore_path.push("blockstore");
        datastore_path.push("datastore");
        lockfile_path.push("repo_lock");
        let block_store = Arc::new(blockstore::flatfs::FsBlockStore::new(blockstore_path, duration));
        #[cfg(not(any(feature = "sled_data_store", feature = "redb_data_store")))]
        let data_store = Arc::new(datastore::flatfs::FsDataStore::new(datastore_path));
        #[cfg(feature = "sled_data_store")]
        let data_store = Arc::new(datastore::sled::SledDataStore::new(datastore_path));
        #[cfg(feature = "redb_data_store")]
        let data_store = Arc::new(datastore::redb::RedbDataStore::new(datastore_path));
        let lockfile = Arc::new(lock::FsLock::new(lockfile_path));
        Self::new_raw(block_store, data_store, lockfile)
    }
    pub fn new_memory(duration: Option<Duration>) -> Self {
        let duration = duration.unwrap_or(Duration::from_secs(60 * 2));
        let block_store = Arc::new(blockstore::memory::MemBlockStore::new(Default::default(), duration));
        let data_store = Arc::new(datastore::memory::MemDataStore::new(Default::default()));
        let lockfile = Arc::new(lock::MemLock);
        Self::new_raw(block_store, data_store, lockfile)
    }
    pub fn set_max_storage_size(&self, size: usize) {
        self.max_storage_size.store(size, Ordering::SeqCst);
    }
    pub fn max_storage_size(&self) -> usize {
        self.max_storage_size.load(Ordering::SeqCst)
    }
    pub async fn migrate(&self, repo: &Self) -> Result<(), Error> {
        if self.is_online() || repo.is_online() {
            anyhow::bail!("Repository cannot be online");
        }
        let block_migration = {
            let this = self.clone();
            let external = repo.clone();
            async move {
                if let Ok(list) = this.list_blocks().await {
                    for cid in list {
                        match this.get_block_now(&cid).await {
                            Ok(Some(block)) => match external.block_store.put(block).await {
                                Ok(_) => {}
                                Err(e) => error!("Error migrating {cid}: {e}"),
                            },
                            Ok(None) => error!("{cid} doesnt exist"),
                            Err(e) => error!("Error getting block {cid}: {e}"),
                        }
                    }
                }
            }
        };
        let data_migration = {
            let this = self.clone();
            let external = repo.clone();
            async move {
                let mut data_stream = this.data_store().iter().await;
                while let Some((k, v)) = data_stream.next().await {
                    if let Err(e) = external.data_store().put(&k, &v).await {
                        error!("Unable to migrate {k:?} into repo: {e}");
                    }
                }
            }
        };
        let pins_migration = {
            let this = self.clone();
            let external = repo.clone();
            async move {
                let mut stream = this.data_store().list(None).await;
                while let Some(Ok((cid, pin_mode))) = stream.next().await {
                    match pin_mode {
                        PinMode::Direct => {
                            match external.data_store().insert_direct_pin(&cid).await {
                                Ok(_) => {}
                                Err(e) => error!("Unable to migrate pin {cid}: {e}"),
                            }
                        }
                        PinMode::Indirect => {
                            continue;
                        }
                        PinMode::Recursive => {
                            let block = match this.get_block_now(&cid).await.map(|block| {
                                block.and_then(|block| block.decode::<IpldCodec, Ipld>().ok())
                            }) {
                                Ok(Some(block)) => block,
                                Ok(None) => continue,
                                Err(e) => {
                                    error!("Block {cid} does not exist but is pinned: {e}");
                                    continue;
                                }
                            };
                            let st = crate::refs::IpldRefs::default()
                                .with_only_unique()
                                .refs_of_resolved(self, vec![(cid, block.clone())].into_iter())
                                .map_ok(|crate::refs::Edge { destination, .. }| destination)
                                .into_stream()
                                .boxed();
                            if let Err(e) = external.insert_recursive_pin(&cid, st).await {
                                error!("Error migrating pin {cid}: {e}");
                                continue;
                            }
                        }
                    }
                }
            }
        };
        futures::join!(block_migration, data_migration, pins_migration);
        Ok(())
    }
    pub(crate) fn initialize_channel(&self) -> Receiver<RepoEvent> {
        let mut event_guard = self.events.write();
        let (sender, receiver) = channel(1);
        debug_assert!(event_guard.is_none());
        *event_guard = Some(sender);
        self.set_online();
        receiver
    }
    pub fn shutdown(&self) {
        let mut map = self.subscriptions.lock();
        map.clear();
        drop(map);
        if let Some(mut event) = self.events.write().take() {
            event.close_channel()
        }
        self.set_offline();
    }
    pub fn is_online(&self) -> bool {
        self.online.load(Ordering::SeqCst)
    }
    pub(crate) fn set_online(&self) {
        if self.is_online() {
            return;
        }
        self.online.store(true, Ordering::SeqCst)
    }
    pub(crate) fn set_offline(&self) {
        if !self.is_online() {
            return;
        }
        self.online.store(false, Ordering::SeqCst)
    }
    fn repo_channel(&self) -> Option<Sender<RepoEvent>> {
        self.events.read().clone()
    }
    pub async fn init(&self) -> Result<(), Error> {
        if self.initialized.load(Ordering::SeqCst) {
            return Ok(());
        }
        {
            log::debug!("Trying lockfile");
            self.lockfile.try_exclusive()?;
            log::debug!("lockfile tried");
        }
        let f1 = self.block_store.init();
        let f2 = self.data_store.init();
        let (r1, r2) = futures::future::join(f1, f2).await;
        let init = self.initialized.clone();
        if r1.is_err() {
            r1.map(|_| {
                init.store(true, Ordering::SeqCst);
            })
        } else {
            r2.map(|_| {
                init.store(true, Ordering::SeqCst);
            })
        }
    }
    pub async fn open(&self) -> Result<(), Error> {
        let f1 = self.block_store.open();
        let f2 = self.data_store.open();
        let (r1, r2) = futures::future::join(f1, f2).await;
        if r1.is_err() {
            r1
        } else {
            r2
        }
    }
    pub async fn put_block(&self, block: Block) -> Result<(Cid, BlockPut), Error> {
        let _guard = self.gclock.read().await;
        let (cid, res) = self.block_store.put(block.clone()).await?;
        if let BlockPut::NewBlock = res {
            if let Some(mut event) = self.repo_channel() {
                _ = event.send(RepoEvent::NewBlock(block.clone())).await;
            }
            let list = self.subscriptions.lock().remove(&cid);
            if let Some(mut list) = list {
                for ch in list.drain(..) {
                    let block = block.clone();
                    let _ = ch.send(Ok(block));
                }
            }
        }
        Ok((cid, res))
    }
    #[inline]
    pub async fn get_block(
        &self,
        cid: &Cid,
        peers: &[PeerId],
        local_only: bool,
    ) -> Result<Block, Error> {
        self.get_block_with_session(None, cid, peers, local_only, None)
            .await
    }
    #[inline]
    pub async fn get_blocks(
        &self,
        cids: &[Cid],
        peers: &[PeerId],
        local_only: bool,
    ) -> Result<BoxStream<'static, Result<Block, Error>>, Error> {
        self.get_blocks_with_session(None, cids, peers, local_only, None)
            .await
    }
    #[inline]
    pub async fn get_blocks_size(&self, cids: &[Cid]) -> Result<Option<usize>, Error> {
        self.block_store.size(cids).await
    }
    #[inline]
    pub async fn get_total_size(&self) -> Result<usize, Error> {
        self.block_store.total_size().await
    }
    pub(crate) async fn get_blocks_with_session(
        &self,
        session: Option<u64>,
        cids: &[Cid],
        peers: &[PeerId],
        local_only: bool,
        timeout: Option<Duration>,
    ) -> Result<BoxStream<'static, Result<Block, Error>>, Error> {
        let _guard = self.gclock.read().await;
        let mut blocks = FuturesOrdered::new();
        let mut missing = cids.to_vec();
        for cid in cids {
            match self.get_block_now(cid).await {
                Ok(Some(block)) => {
                    blocks.push_back(async { Ok(block) }.boxed());
                    if let Some(index) = missing.iter().position(|c| c == cid) {
                        missing.remove(index);
                    }
                }
                Ok(None) | Err(_) => {}
            }
        }
        if missing.is_empty() {
            return Ok(blocks.boxed());
        }
        if local_only || !self.is_online() {
            anyhow::bail!("Unable to locate missing blocks {missing:?}");
        }
        for cid in &missing {
            let cid = *cid;
            let (tx, rx) = futures::channel::oneshot::channel();
            self.subscriptions.lock().entry(cid).or_default().push(tx);
            let timeout = timeout.unwrap_or(Duration::from_secs(60));
            let task = async move {
                let block = tokio::time::timeout(timeout, rx)
                    .await
                    .map_err(|_| anyhow::anyhow!("Timeout while resolving {cid}"))??
                    .map_err(|e| anyhow!("{e}"))?;
                Ok::<_, anyhow::Error>(block)
            }
            .boxed();
            blocks.push_back(task);
        }
        let mut events = self
            .repo_channel()
            .ok_or(anyhow::anyhow!("Channel is not available"))?;
        events
            .send(RepoEvent::WantBlock(session, cids.to_vec(), peers.to_vec()))
            .await
            .ok();
        Ok(blocks.boxed())
    }
    pub(crate) async fn get_block_with_session(
        &self,
        session: Option<u64>,
        cid: &Cid,
        peers: &[PeerId],
        local_only: bool,
        timeout: Option<Duration>,
    ) -> Result<Block, Error> {
        let cids = vec![*cid];
        let mut blocks = self
            .get_blocks_with_session(session, &cids, peers, local_only, timeout)
            .await?;
        blocks
            .next()
            .await
            .ok_or(anyhow::anyhow!("Unable to locate {} block", *cid))?
    }
    pub async fn get_block_now(&self, cid: &Cid) -> Result<Option<Block>, Error> {
        self.block_store.get(cid).await
    }
    pub async fn contains(&self, cid: &Cid) -> Result<bool, Error> {
        self.block_store.contains(cid).await
    }
    pub async fn list_blocks(&self) -> Result<Vec<Cid>, Error> {
        self.block_store.list().await
    }
    pub async fn remove_block(&self, cid: &Cid, recursive: bool) -> Result<Vec<Cid>, Error> {
        let _guard = self.gclock.read().await;
        if self.is_pinned(cid).await? {
            return Err(anyhow::anyhow!("block to remove is pinned"));
        }
        let mut removed = vec![];
        let list = match recursive {
            true => {
                let mut list = self.recursive_collections(*cid).await?;
                list.insert(*cid);
                list
            }
            false => BTreeSet::from_iter(std::iter::once(*cid)),
        };
        for cid in list {
            if self.is_pinned(&cid).await? {
                continue;
            }
            match self.block_store.remove(&cid).await? {
                Ok(success) => match success {
                    BlockRm::Removed(_cid) => {
                        if let Some(mut events) = self.repo_channel() {
                            let _ = events.send(RepoEvent::RemovedBlock(cid)).await;
                        }
                        removed.push(cid);
                    }
                },
                Err(err) => match err {
                    BlockRmError::NotFound(cid) => warn!("{cid} is not found to be removed"),
                },
            }
        }
        Ok(removed)
    }
    fn recursive_collections(&self, cid: Cid) -> BoxFuture<'_, anyhow::Result<BTreeSet<Cid>>> {
        async move {
            let block = self
                .get_block_now(&cid)
                .await?
                .ok_or(anyhow::anyhow!("Block does not exist"))?;
            let mut references: BTreeSet<Cid> = BTreeSet::new();
            block.references(&mut references)?;
            let mut list = BTreeSet::new();
            for cid in &references {
                let mut inner_list = self.recursive_collections(*cid).await?;
                list.append(&mut inner_list);
            }
            references.append(&mut list);
            Ok(references)
        }
        .boxed()
    }
    pub async fn get_ipns(&self, ipns: &PeerId) -> Result<Option<IpfsPath>, Error> {
        use std::str::FromStr;
        let data_store = &self.data_store;
        let key = ipns.to_owned();
        let key = format!("ipns/{key}");
        let bytes = data_store.get(key.as_bytes()).await?;
        match bytes {
            Some(ref bytes) => {
                let string = String::from_utf8_lossy(bytes);
                let path = IpfsPath::from_str(&string)?;
                Ok(Some(path))
            }
            None => Ok(None),
        }
    }
    pub async fn put_ipns(&self, ipns: &PeerId, path: &IpfsPath) -> Result<(), Error> {
        let string = path.to_string();
        let value = string.as_bytes();
        let key = format!("ipns/{ipns}");
        self.data_store.put(key.as_bytes(), value).await
    }
    pub async fn remove_ipns(&self, ipns: &PeerId) -> Result<(), Error> {
        let key = format!("ipns/{ipns}");
        self.data_store.remove(key.as_bytes()).await
    }
    pub async fn insert_pin(
        &self,
        cid: &Cid,
        recursive: bool,
        local_only: bool,
    ) -> Result<(), Error> {
        let block = self.get_block(cid, &[], local_only).await?;
        if !recursive {
            self.insert_direct_pin(cid).await?
        } else {
            let ipld = block.decode::<IpldCodec, Ipld>()?;
            let st = crate::refs::IpldRefs::default()
                .with_only_unique()
                .refs_of_resolved(self, vec![(*cid, ipld.clone())].into_iter())
                .map_ok(|crate::refs::Edge { destination, .. }| destination)
                .into_stream()
                .boxed();
            self.insert_recursive_pin(cid, st).await?
        }
        Ok(())
    }
    pub async fn insert_direct_pin(&self, cid: &Cid) -> Result<(), Error> {
        self.data_store.insert_direct_pin(cid).await
    }
    pub async fn insert_recursive_pin(&self, cid: &Cid, refs: References<'_>) -> Result<(), Error> {
        self.data_store.insert_recursive_pin(cid, refs).await
    }
    pub async fn remove_direct_pin(&self, cid: &Cid) -> Result<(), Error> {
        self.data_store.remove_direct_pin(cid).await
    }
    pub async fn remove_recursive_pin(&self, cid: &Cid, refs: References<'_>) -> Result<(), Error> {
        self.data_store.remove_recursive_pin(cid, refs).await
    }
    pub async fn cleanup(&self) -> Result<Vec<Cid>, Error> {
        let _guard = self.gclock.write().await;
        let pinned = self
            .list_pins(None)
            .await
            .try_filter_map(|(cid, _)| futures::future::ready(Ok(Some(cid))))
            .try_collect::<BTreeSet<_>>()
            .await?;
        let refs = futures::stream::iter(pinned);
        let removed_blocks = self.block_store.remove_garbage(refs.boxed()).await?;
        Ok(removed_blocks)
    }
    pub async fn is_pinned(&self, cid: &Cid) -> Result<bool, Error> {
        self.data_store.is_pinned(cid).await
    }
    pub async fn list_pins(
        &self,
        mode: Option<PinMode>,
    ) -> futures::stream::BoxStream<'static, Result<(Cid, PinMode), Error>> {
        self.data_store.list(mode).await
    }
    pub async fn query_pins(
        &self,
        cids: Vec<Cid>,
        requirement: Option<PinMode>,
    ) -> Result<Vec<(Cid, PinKind<Cid>)>, Error> {
        self.data_store.query(cids, requirement).await
    }
}
impl Repo {
    pub fn data_store(&self) -> &dyn DataStore {
        &*self.data_store
    }
}