Skip to main content

rust_ipfs/repo/
mod.rs

1//! Storage implementation(s) backing the [`crate::Ipfs`].
2use crate::error::Error;
3use crate::Block;
4use connexa::prelude::identity::PeerId;
5use core::fmt::Debug;
6use futures::channel::mpsc::{channel, Receiver, Sender};
7use futures::future::{BoxFuture, Either};
8use futures::sink::SinkExt;
9use futures::stream::{self, BoxStream, FuturesOrdered};
10use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
11use indexmap::IndexSet;
12use ipld_core::cid::Cid;
13use parking_lot::{Mutex, RwLock};
14use std::borrow::Borrow;
15use std::collections::{BTreeSet, HashMap};
16use std::future::{Future, IntoFuture};
17#[allow(unused_imports)]
18use std::path::Path;
19use std::pin::Pin;
20use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use std::time::Duration;
24use std::{error, fmt, io};
25use tokio::sync::{Notify, RwLockReadGuard};
26use tracing::{Instrument, Span};
27
28#[macro_use]
29#[cfg(test)]
30mod common_tests;
31
32pub use store::{blockstore, datastore, default_impl::DefaultStorage};
33
34pub mod lock;
35
36/// Path mangling done for pins and blocks
37#[cfg(not(target_arch = "wasm32"))]
38pub(crate) mod paths;
39mod store;
40
41/// Describes the outcome of `BlockStore::put_block`.
42#[derive(Debug, PartialEq, Eq)]
43pub enum BlockPut {
44    /// A new block was written to the blockstore.
45    NewBlock,
46    /// The block already exists.
47    Existed,
48}
49
50/// Describes the outcome of `BlockStore::remove`.
51#[derive(Debug)]
52pub enum BlockRm {
53    /// A block was successfully removed from the blockstore.
54    Removed(Cid),
55    // TODO: DownloadCancelled(Cid, Duration),
56}
57
58// pub struct BlockNotFound(Cid);
59/// Describes the error variants for `BlockStore::remove`.
60#[derive(Debug)]
61pub enum BlockRmError {
62    // TODO: Pinned(Cid),
63    /// The `Cid` doesn't correspond to a block in the blockstore.
64    NotFound(Cid),
65}
66
67pub trait StoreOpt {
68    fn into_opt(self) -> Self;
69}
70
71/// This API is being discussed and evolved, which will likely lead to breakage.
72pub trait BlockStore: Debug + Send + Sync {
73    fn init(&self) -> impl Future<Output = Result<(), Error>> + Send;
74
75    /// Returns whether a block is present in the blockstore.
76    fn contains(&self, cid: &Cid) -> impl Future<Output = Result<bool, Error>> + Send;
77    /// Returns a block from the blockstore.
78    fn get(&self, cid: &Cid) -> impl Future<Output = Result<Option<Block>, Error>> + Send;
79    /// Get the size of a single block
80    fn size(&self, cid: &[Cid]) -> impl Future<Output = Result<Option<usize>, Error>> + Send;
81    /// Get a total size of the block store
82    fn total_size(&self) -> impl Future<Output = Result<usize, Error>> + Send;
83    /// Inserts a block in the blockstore.
84    fn put(&self, block: &Block) -> impl Future<Output = Result<(Cid, BlockPut), Error>> + Send;
85    /// Removes a block from the blockstore.
86    fn remove(&self, cid: &Cid) -> impl Future<Output = Result<(), Error>> + Send;
87    /// Remove multiple blocks from the blockstore
88    fn remove_many(
89        &self,
90        blocks: BoxStream<'static, Cid>,
91    ) -> impl Future<Output = BoxStream<'static, Cid>> + Send;
92    /// Returns a list of the blocks (Cids), in the blockstore.
93    fn list(&self) -> impl Future<Output = BoxStream<'static, Cid>> + Send;
94}
95
96/// Generic layer of abstraction for a key-value data store.
97pub trait DataStore: PinStore + Debug + Send + Sync {
98    fn init(&self) -> impl Future<Output = Result<(), Error>> + Send;
99    /// Checks if a key is present in the datastore.
100    fn contains(&self, key: &[u8]) -> impl Future<Output = Result<bool, Error>> + Send;
101    /// Returns the value associated with a key from the datastore.
102    fn get(&self, key: &[u8]) -> impl Future<Output = Result<Option<Vec<u8>>, Error>> + Send;
103    /// Puts the value under the key in the datastore.
104    fn put(&self, key: &[u8], value: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
105    /// Removes a key-value pair from the datastore.
106    fn remove(&self, key: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
107    /// Iterate over the k/v of the datastore
108    fn iter(
109        &self,
110    ) -> impl Future<Output = futures::stream::BoxStream<'static, (Vec<u8>, Vec<u8>)>> + Send;
111}
112
113#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
114pub struct GCConfig {
115    /// How long until GC runs
116    /// If duration is not set, it will not run at a timer
117    pub duration: Duration,
118
119    /// What will trigger GC
120    pub trigger: GCTrigger,
121}
122
123impl Default for GCConfig {
124    fn default() -> Self {
125        Self {
126            duration: Duration::from_secs(60 * 60),
127            trigger: GCTrigger::default(),
128        }
129    }
130}
131
132#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord)]
133pub enum GCTrigger {
134    /// At a specific size. If the size is at or exceeds, it will trigger GC
135    At {
136        size: usize,
137    },
138
139    AtStorage,
140
141    /// No trigger
142    #[default]
143    None,
144}
145
146/// Errors variants describing the possible failures for `Lock::try_exclusive`.
147#[derive(Debug)]
148pub enum LockError {
149    RepoInUse,
150    LockFileOpenFailed(io::Error),
151}
152
153impl fmt::Display for LockError {
154    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155        let msg = match self {
156            LockError::RepoInUse => "The repository is already being used by an IPFS instance.",
157            LockError::LockFileOpenFailed(_) => "Failed to open repository lock file.",
158        };
159
160        write!(f, "{msg}")
161    }
162}
163
164impl From<io::Error> for LockError {
165    fn from(error: io::Error) -> Self {
166        match error.kind() {
167            // `WouldBlock` is not used by `OpenOptions` (this could change), and can therefore be
168            // matched on for the fs2 error in `FsLock::try_exclusive`.
169            io::ErrorKind::WouldBlock => LockError::RepoInUse,
170            _ => LockError::LockFileOpenFailed(error),
171        }
172    }
173}
174
175impl error::Error for LockError {
176    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
177        if let Self::LockFileOpenFailed(error) = self {
178            Some(error)
179        } else {
180            None
181        }
182    }
183}
184
185/// A trait for describing repository locking.
186///
187/// This ensures no two IPFS nodes can be started with the same peer ID, as exclusive access to the
188/// repository is guaranteed. This is most useful when using a fs backed repo.
189pub trait Lock: Debug + Send + Sync {
190    // fn new(path: PathBuf) -> Self;
191    fn try_exclusive(&self) -> Result<(), LockError>;
192}
193
194type References<'a> = BoxStream<'a, Result<Cid, crate::refs::IpldRefsError>>;
195
196pub trait PinStore: Debug + Send + Sync {
197    fn is_pinned(&self, block: &Cid) -> impl Future<Output = Result<bool, Error>> + Send;
198
199    fn insert_direct_pin(&self, target: &Cid) -> impl Future<Output = Result<(), Error>> + Send;
200
201    fn insert_recursive_pin(
202        &self,
203        target: &Cid,
204        referenced: References<'_>,
205    ) -> impl Future<Output = Result<(), Error>> + Send;
206
207    fn remove_direct_pin(&self, target: &Cid) -> impl Future<Output = Result<(), Error>> + Send;
208
209    fn remove_recursive_pin(
210        &self,
211        target: &Cid,
212        referenced: References<'_>,
213    ) -> impl Future<Output = Result<(), Error>> + Send;
214
215    fn list(
216        &self,
217        mode: Option<PinMode>,
218    ) -> impl Future<Output = futures::stream::BoxStream<'static, Result<(Cid, PinMode), Error>>> + Send;
219
220    /// Returns error if any of the ids isn't pinned in the required type, otherwise returns
221    /// the pin details if all of the cids are pinned in one way or the another.
222    fn query(
223        &self,
224        ids: Vec<Cid>,
225        requirement: Option<PinMode>,
226    ) -> impl Future<Output = Result<Vec<(Cid, PinKind<Cid>)>, Error>> + Send;
227}
228
229/// `PinMode` is the description of pin type for quering purposes.
230#[derive(Debug, PartialEq, Eq, Clone, Copy)]
231pub enum PinMode {
232    Indirect,
233    Direct,
234    Recursive,
235}
236
237/// Helper for around the quite confusing test required in [`PinStore::list`] and
238/// [`PinStore::query`].
239#[derive(Debug, Clone, Copy)]
240enum PinModeRequirement {
241    Only(PinMode),
242    Any,
243}
244
245impl From<Option<PinMode>> for PinModeRequirement {
246    fn from(filter: Option<PinMode>) -> Self {
247        match filter {
248            Some(one) => PinModeRequirement::Only(one),
249            None => PinModeRequirement::Any,
250        }
251    }
252}
253
254#[allow(dead_code)]
255impl PinModeRequirement {
256    fn is_indirect_or_any(&self) -> bool {
257        use PinModeRequirement::*;
258        match self {
259            Only(PinMode::Indirect) | Any => true,
260            Only(_) => false,
261        }
262    }
263
264    fn matches<P: PartialEq<PinMode>>(&self, other: &P) -> bool {
265        use PinModeRequirement::*;
266        match self {
267            Only(one) if other == one => true,
268            Only(_) => false,
269            Any => true,
270        }
271    }
272
273    fn required(&self) -> Option<PinMode> {
274        use PinModeRequirement::*;
275        match self {
276            Only(one) => Some(*one),
277            Any => None,
278        }
279    }
280}
281
282impl<B: Borrow<Cid>> PartialEq<PinMode> for PinKind<B> {
283    fn eq(&self, other: &PinMode) -> bool {
284        matches!(
285            (self, other),
286            (PinKind::IndirectFrom(_), PinMode::Indirect)
287                | (PinKind::Direct, PinMode::Direct)
288                | (PinKind::Recursive(_), PinMode::Recursive)
289                | (PinKind::RecursiveIntention, PinMode::Recursive)
290        )
291    }
292}
293
294/// `PinKind` is more specific pin description for writing purposes. Implements
295/// `PartialEq<&PinMode>`. Generic over `Borrow<Cid>` to allow storing both reference and owned
296/// value of Cid.
297#[derive(Debug, PartialEq, Eq)]
298pub enum PinKind<C: Borrow<Cid>> {
299    IndirectFrom(C),
300    Direct,
301    Recursive(u64),
302    RecursiveIntention,
303}
304
305impl<C: Borrow<Cid>> PinKind<C> {
306    fn as_ref(&self) -> PinKind<&'_ Cid> {
307        match self {
308            PinKind::IndirectFrom(c) => PinKind::IndirectFrom(c.borrow()),
309            PinKind::Direct => PinKind::Direct,
310            PinKind::Recursive(count) => PinKind::Recursive(*count),
311            PinKind::RecursiveIntention => PinKind::RecursiveIntention,
312        }
313    }
314}
315
316type SubscriptionsMap = HashMap<Cid, Vec<futures::channel::oneshot::Sender<Result<Block, String>>>>;
317
318/// Represents the configuration of the Ipfs node, its backing blockstore and datastore.
319pub trait StorageTypes: RepoTypes {}
320impl<T: RepoTypes> StorageTypes for T {}
321
322pub trait RepoTypes: Clone + Send + Sync + 'static {
323    /// Describes a blockstore.
324    type TBlockStore: BlockStore;
325    /// Describes a datastore.
326    type TDataStore: DataStore;
327    type TLock: Lock;
328}
329
330/// Describes a repo.
331/// Consolidates a blockstore, a datastore and a subscription registry.
332#[derive(Debug, Clone)]
333pub struct Repo<S: RepoTypes> {
334    pub(crate) inner: Arc<RepoInner<S>>,
335}
336
337#[derive(Debug)]
338pub(crate) struct RepoInner<S: RepoTypes> {
339    online: AtomicBool,
340    initialized: AtomicBool,
341    max_storage_size: AtomicUsize,
342    block_store: S::TBlockStore,
343    data_store: S::TDataStore,
344    events: RwLock<Option<Sender<RepoEvent>>>,
345    pub(crate) subscriptions: Mutex<SubscriptionsMap>,
346    lockfile: S::TLock,
347    pub(crate) gclock: tokio::sync::RwLock<()>,
348}
349
350/// Events used to communicate to the swarm on repo changes.
351#[derive(Debug)]
352pub enum RepoEvent {
353    /// Signals a desired block.
354    WantBlock(
355        Vec<Cid>,
356        Vec<PeerId>,
357        Option<Duration>,
358        Option<HashMap<Cid, Vec<Arc<Notify>>>>,
359    ),
360    /// Signals a desired block is no longer wanted.
361    UnwantBlock(Cid),
362    /// Signals the posession of a new block.
363    NewBlock(Block),
364    /// Signals the removal of a block.
365    RemovedBlock(Cid),
366}
367
368impl Repo<DefaultStorage> {
369    // pub fn new(repo_type: &mut StorageType) -> Self {
370    //     match repo_type {
371    //         StorageType::Memory => Repo::new_memory(),
372    //         #[cfg(not(target_arch = "wasm32"))]
373    //         StorageType::Disk(path) => Repo::new_fs(path),
374    //         #[cfg(target_arch = "wasm32")]
375    //         StorageType::IndexedDb { namespace } => Repo::new_idb(namespace.take()),
376    //         StorageType::Custom {
377    //             blockstore,
378    //             datastore,
379    //             lock,
380    //         } => Repo::new_raw(
381    //             blockstore.take().expect("Requires blockstore"),
382    //             datastore.take().expect("Requires datastore"),
383    //             lock.take()
384    //                 .expect("Requires lockfile for data and block store"),
385    //         ),
386    //     }
387    // }
388
389    #[cfg(not(target_arch = "wasm32"))]
390    pub fn new_fs(path: impl AsRef<Path>) -> Self {
391        let mut default = DefaultStorage::default();
392
393        let path = path.as_ref().to_path_buf();
394        let mut blockstore_path = path.clone();
395        let mut datastore_path = path.clone();
396        let mut lockfile_path = path;
397        blockstore_path.push("blockstore");
398        datastore_path.push("datastore");
399        lockfile_path.push("repo_lock");
400
401        default.set_blockstore_path(blockstore_path);
402        default.set_datastore_path(datastore_path);
403        default.set_lockfile(lockfile_path);
404
405        Self::new_raw(default.clone(), default.clone(), default)
406    }
407
408    pub fn new_memory() -> Self {
409        let default = DefaultStorage::default();
410        Self::new_raw(default.clone(), default.clone(), default)
411    }
412
413    #[cfg(target_arch = "wasm32")]
414    pub fn new_idb(namespace: Option<String>) -> Self {
415        let mut default = DefaultStorage::default();
416        default.set_namespace(namespace);
417        Self::new_raw(default.clone(), default.clone(), default)
418    }
419}
420
421impl<S: RepoTypes> Repo<S> {
422    pub fn new_raw(
423        block_store: S::TBlockStore,
424        data_store: S::TDataStore,
425        lockfile: S::TLock,
426    ) -> Self {
427        let inner = RepoInner {
428            initialized: AtomicBool::default(),
429            online: AtomicBool::default(),
430            block_store,
431            data_store,
432            events: Default::default(),
433            subscriptions: Default::default(),
434            lockfile,
435            max_storage_size: Default::default(),
436            gclock: Default::default(),
437        };
438        Repo {
439            inner: Arc::new(inner),
440        }
441    }
442
443    pub fn set_max_storage_size(&self, size: usize) {
444        self.inner.max_storage_size.store(size, Ordering::SeqCst);
445    }
446
447    pub fn max_storage_size(&self) -> usize {
448        self.inner.max_storage_size.load(Ordering::SeqCst)
449    }
450
451    pub async fn migrate(&self, repo: &Self) -> Result<(), Error> {
452        if self.is_online() || repo.is_online() {
453            anyhow::bail!("Repository cannot be online");
454        }
455        let block_migration = {
456            async move {
457                let mut stream = self.list_blocks().await;
458                while let Some(cid) = stream.next().await {
459                    match self.get_block_now(&cid).await {
460                        Ok(Some(block)) => match repo.inner.block_store.put(&block).await {
461                            Ok(_) => {}
462                            Err(e) => error!("Error migrating {cid}: {e}"),
463                        },
464                        Ok(None) => error!("{cid} doesnt exist"),
465                        Err(e) => error!("Error getting block {cid}: {e}"),
466                    }
467                }
468            }
469        };
470
471        let data_migration = {
472            async move {
473                let mut data_stream = self.data_store().iter().await;
474                while let Some((k, v)) = data_stream.next().await {
475                    if let Err(e) = repo.data_store().put(&k, &v).await {
476                        error!("Unable to migrate {k:?} into repo: {e}");
477                    }
478                }
479            }
480        };
481
482        let pins_migration = {
483            async move {
484                let mut stream = self.data_store().list(None).await;
485                while let Some(Ok((cid, pin_mode))) = stream.next().await {
486                    match pin_mode {
487                        PinMode::Direct => match repo.data_store().insert_direct_pin(&cid).await {
488                            Ok(_) => {}
489                            Err(e) => error!("Unable to migrate pin {cid}: {e}"),
490                        },
491                        PinMode::Indirect => {
492                            //No need to track since we will be obtaining the reference from the pin that is recursive
493                            continue;
494                        }
495                        PinMode::Recursive => {
496                            let block = match self
497                                .get_block_now(&cid)
498                                .await
499                                .map(|block| block.and_then(|block| block.to_ipld().ok()))
500                            {
501                                Ok(Some(block)) => block,
502                                Ok(None) => continue,
503                                Err(e) => {
504                                    error!("Block {cid} does not exist but is pinned: {e}");
505                                    continue;
506                                }
507                            };
508
509                            let st = crate::refs::IpldRefs::default()
510                                .with_only_unique()
511                                .refs_of_resolved(self, vec![(cid, block.clone())].into_iter())
512                                .map_ok(|crate::refs::Edge { destination, .. }| destination)
513                                .into_stream()
514                                .boxed();
515
516                            if let Err(e) = repo.insert_recursive_pin(&cid, st).await {
517                                error!("Error migrating pin {cid}: {e}");
518                                continue;
519                            }
520                        }
521                    }
522                }
523            }
524        };
525
526        futures::join!(block_migration, data_migration, pins_migration);
527
528        Ok(())
529    }
530
531    pub(crate) fn initialize_channel(&self) -> Receiver<RepoEvent> {
532        let mut event_guard = self.inner.events.write();
533        let (sender, receiver) = channel(1);
534        debug_assert!(event_guard.is_none());
535        *event_guard = Some(sender);
536        self.set_online();
537        receiver
538    }
539
540    /// Shutdowns the repo, cancelling any pending subscriptions; Likely going away after some
541    /// refactoring, see notes on [`crate::Ipfs::exit_daemon`].
542    pub fn shutdown(&self) {
543        let mut map = self.inner.subscriptions.lock();
544        map.clear();
545        drop(map);
546        if let Some(mut event) = self.inner.events.write().take() {
547            event.close_channel()
548        }
549        self.set_offline();
550    }
551
552    pub fn is_online(&self) -> bool {
553        self.inner.online.load(Ordering::SeqCst)
554    }
555
556    pub(crate) fn set_online(&self) {
557        if self.is_online() {
558            return;
559        }
560
561        self.inner.online.store(true, Ordering::SeqCst)
562    }
563
564    pub(crate) fn set_offline(&self) {
565        if !self.is_online() {
566            return;
567        }
568
569        self.inner.online.store(false, Ordering::SeqCst)
570    }
571
572    fn repo_channel(&self) -> Option<Sender<RepoEvent>> {
573        self.inner.events.read().clone()
574    }
575
576    pub async fn init(&self) -> Result<(), Error> {
577        //Avoid initializing again
578        if self.inner.initialized.load(Ordering::SeqCst) {
579            return Ok(());
580        }
581        // Dropping the guard (even though not strictly necessary to compile) to avoid potential
582        // deadlocks if `block_store` or `data_store` were to try to access `Repo.lockfile`.
583        {
584            tracing::debug!("Trying lockfile");
585            self.inner.lockfile.try_exclusive()?;
586            tracing::debug!("lockfile tried");
587        }
588
589        self.inner.block_store.init().await?;
590        self.inner.data_store.init().await?;
591        self.inner.initialized.store(true, Ordering::SeqCst);
592        Ok(())
593    }
594
595    /// Puts a block into the block store.
596    pub fn put_block(&self, block: &Block) -> RepoPutBlock<S> {
597        RepoPutBlock::new(self, block).broadcast_on_new_block(true)
598    }
599
600    /// Retrives a block from the block store, or starts fetching it from the network and awaits
601    /// until it has been fetched.
602    #[inline]
603    pub fn get_block<C: Borrow<Cid>>(&self, cid: C) -> RepoGetBlock<S> {
604        RepoGetBlock::new(Repo::clone(self), cid)
605    }
606
607    /// Retrives a set of blocks from the block store, or starts fetching them from the network and awaits
608    /// until it has been fetched.
609    #[inline]
610    pub fn get_blocks(&self, cids: impl IntoIterator<Item = impl Borrow<Cid>>) -> RepoGetBlocks<S> {
611        RepoGetBlocks::new(Repo::clone(self)).blocks(cids)
612    }
613
614    /// Get the size of listed blocks
615    #[inline]
616    pub async fn get_blocks_size(&self, cids: &[Cid]) -> Result<Option<usize>, Error> {
617        self.inner.block_store.size(cids).await
618    }
619
620    /// Get the total size of the block store
621    #[inline]
622    pub async fn get_total_size(&self) -> Result<usize, Error> {
623        self.inner.block_store.total_size().await
624    }
625
626    /// Retrieves a block from the block store if it's available locally.
627    pub async fn get_block_now<C: Borrow<Cid>>(&self, cid: C) -> Result<Option<Block>, Error> {
628        let cid = cid.borrow();
629        self.inner.block_store.get(cid).await
630    }
631
632    /// Check to determine if blockstore contain a block
633    pub async fn contains<C: Borrow<Cid>>(&self, cid: C) -> Result<bool, Error> {
634        let cid = cid.borrow();
635        self.inner.block_store.contains(cid).await
636    }
637
638    /// Lists the blocks in the blockstore.
639    pub async fn list_blocks(&self) -> BoxStream<'static, Cid> {
640        self.inner.block_store.list().await
641    }
642
643    /// Remove block from the block store.
644    pub async fn remove_block<C: Borrow<Cid>>(
645        &self,
646        cid: C,
647        recursive: bool,
648    ) -> Result<Vec<Cid>, Error> {
649        let _guard = self.inner.gclock.read().await;
650        let cid = cid.borrow();
651        if self.is_pinned(cid).await? {
652            return Err(anyhow::anyhow!("block to remove is pinned"));
653        }
654
655        let list = match recursive {
656            true => {
657                let mut list = self.recursive_collections(*cid).await;
658                // ensure the first root block is apart of the list
659                list.insert(*cid);
660                list
661            }
662            false => BTreeSet::from_iter(std::iter::once(*cid)),
663        };
664
665        let list = stream::iter(
666            FuturesOrdered::from_iter(list.into_iter().map(|cid| async move { cid }))
667                .filter_map(|cid| async move {
668                    (!self.is_pinned(&cid).await.unwrap_or_default()).then_some(cid)
669                })
670                .collect::<Vec<Cid>>()
671                .await,
672        )
673        .boxed();
674
675        let removed = self
676            .inner
677            .block_store
678            .remove_many(list)
679            .await
680            .collect::<Vec<_>>()
681            .await;
682
683        for cid in &removed {
684            // notify ipfs task about the removed blocks
685            if let Some(mut events) = self.repo_channel() {
686                let _ = events.send(RepoEvent::RemovedBlock(*cid)).await;
687            }
688        }
689        Ok(removed)
690    }
691
692    fn recursive_collections(&self, cid: Cid) -> BoxFuture<'_, BTreeSet<Cid>> {
693        async move {
694            let block = match self.get_block_now(&cid).await {
695                Ok(Some(block)) => block,
696                _ => return BTreeSet::default(),
697            };
698
699            let mut references: BTreeSet<Cid> = BTreeSet::new();
700            if block.references(&mut references).is_err() {
701                return BTreeSet::default();
702            }
703
704            let mut list = BTreeSet::new();
705
706            for cid in &references {
707                let mut inner_list = self.recursive_collections(*cid).await;
708                list.append(&mut inner_list);
709            }
710
711            references.append(&mut list);
712            references
713        }
714        .boxed()
715    }
716
717    /// Pins a given Cid recursively or directly (non-recursively).
718    ///
719    /// Pins on a block are additive in sense that a previously directly (non-recursively) pinned
720    /// can be made recursive, but removing the recursive pin on the block removes also the direct
721    /// pin as well.
722    ///
723    /// Pinning a Cid recursively (for supported dag-protobuf and dag-cbor) will walk its
724    /// references and pin the references indirectly. When a Cid is pinned indirectly it will keep
725    /// its previous direct or recursive pin and be indirect in addition.
726    ///
727    /// Recursively pinned Cids cannot be re-pinned non-recursively but non-recursively pinned Cids
728    /// can be "upgraded to" being recursively pinned.
729    pub fn pin<C: Borrow<Cid>>(&self, cid: C) -> RepoInsertPin<S> {
730        RepoInsertPin::new(Repo::clone(self), cid)
731    }
732
733    /// Unpins a given Cid recursively or only directly.
734    ///
735    /// Recursively unpinning a previously only directly pinned Cid will remove the direct pin.
736    ///
737    /// Unpinning an indirectly pinned Cid is not possible other than through its recursively
738    /// pinned tree roots.
739    pub fn remove_pin<C: Borrow<Cid>>(&self, cid: C) -> RepoRemovePin<S> {
740        RepoRemovePin::new(Repo::clone(self), cid)
741    }
742
743    pub fn fetch<C: Borrow<Cid>>(&self, cid: C) -> RepoFetch<S> {
744        RepoFetch::new(Repo::clone(self), cid)
745    }
746
747    /// Pins a given Cid recursively or directly (non-recursively).
748    pub(crate) async fn insert_pin(
749        &self,
750        cid: &Cid,
751        recursive: bool,
752        local_only: bool,
753    ) -> Result<(), Error> {
754        let mut pin_fut = self.pin(cid);
755        if recursive {
756            pin_fut = pin_fut.recursive();
757        }
758        if local_only {
759            pin_fut = pin_fut.local();
760        }
761        pin_fut.await
762    }
763
764    /// Inserts a direct pin for a `Cid`.
765    pub(crate) async fn insert_direct_pin(&self, cid: &Cid) -> Result<(), Error> {
766        self.inner.data_store.insert_direct_pin(cid).await
767    }
768
769    /// Inserts a recursive pin for a `Cid`.
770    pub(crate) async fn insert_recursive_pin(
771        &self,
772        cid: &Cid,
773        refs: References<'_>,
774    ) -> Result<(), Error> {
775        self.inner.data_store.insert_recursive_pin(cid, refs).await
776    }
777
778    /// Removes a direct pin for a `Cid`.
779    pub(crate) async fn remove_direct_pin(&self, cid: &Cid) -> Result<(), Error> {
780        self.inner.data_store.remove_direct_pin(cid).await
781    }
782
783    /// Removes a recursive pin for a `Cid`.
784    pub(crate) async fn remove_recursive_pin(
785        &self,
786        cid: &Cid,
787        refs: References<'_>,
788    ) -> Result<(), Error> {
789        // FIXME: not really sure why is there not an easier way to to transfer control
790        self.inner.data_store.remove_recursive_pin(cid, refs).await
791    }
792
793    /// Function to perform a basic cleanup of unpinned blocks
794    pub(crate) async fn cleanup(&self) -> Result<Vec<Cid>, Error> {
795        let repo = self.clone();
796
797        let blocks = repo.list_blocks().await;
798        let pins = repo
799            .list_pins(None)
800            .await
801            .filter_map(|result| futures::future::ready(result.map(|(cid, _)| cid).ok()))
802            .collect::<Vec<_>>()
803            .await;
804
805        let stream = async_stream::stream! {
806            for await cid in blocks {
807                if pins.contains(&cid) {
808                    continue;
809                }
810                yield cid;
811            }
812        }
813        .boxed();
814
815        let removed_blocks = self
816            .inner
817            .block_store
818            .remove_many(stream)
819            .await
820            .collect::<Vec<_>>()
821            .await;
822
823        Ok(removed_blocks)
824    }
825
826    /// Checks if a `Cid` is pinned.
827    pub async fn is_pinned<C: Borrow<Cid>>(&self, cid: C) -> Result<bool, Error> {
828        let cid = cid.borrow();
829        self.inner.data_store.is_pinned(cid).await
830    }
831
832    pub async fn list_pins(
833        &self,
834        mode: impl Into<Option<PinMode>>,
835    ) -> BoxStream<'static, Result<(Cid, PinMode), Error>> {
836        let mode = mode.into();
837        self.inner.data_store.list(mode).await
838    }
839
840    pub async fn query_pins(
841        &self,
842        cids: Vec<Cid>,
843        requirement: impl Into<Option<PinMode>>,
844    ) -> Result<Vec<(Cid, PinKind<Cid>)>, Error> {
845        let requirement = requirement.into();
846        self.inner.data_store.query(cids, requirement).await
847    }
848}
849
850pub struct GCGuard<'a> {
851    _g: RwLockReadGuard<'a, ()>,
852}
853
854impl<S: RepoTypes> Repo<S> {
855    /// Hold a guard to prevent GC from running until this guard has dropped
856    /// Note: Until this guard drops, the GC task, if enabled, would not perform any cleanup.
857    ///       If the GC task is running, this guard will await until GC finishes
858    pub async fn gc_guard(&self) -> GCGuard<'_> {
859        let _g = self.inner.gclock.read().await;
860        GCGuard { _g }
861    }
862
863    pub fn data_store(&self) -> &S::TDataStore {
864        &self.inner.data_store
865    }
866}
867
868pub struct RepoGetBlock<S: RepoTypes> {
869    instance: RepoGetBlocks<S>,
870}
871
872impl<S: RepoTypes> RepoGetBlock<S> {
873    pub fn new(repo: Repo<S>, cid: impl Borrow<Cid>) -> Self {
874        let instance = RepoGetBlocks::new(repo).block(cid);
875        Self { instance }
876    }
877
878    pub fn span(mut self, span: impl Borrow<Span>) -> Self {
879        self.instance = self.instance.span(span);
880        self
881    }
882
883    pub fn timeout(mut self, timeout: impl Into<Option<Duration>>) -> Self {
884        self.instance = self.instance.timeout(timeout);
885        self
886    }
887
888    pub fn local(mut self) -> Self {
889        self.instance = self.instance.local();
890        self
891    }
892
893    pub fn set_local(mut self, local: bool) -> Self {
894        self.instance = self.instance.set_local(local);
895        self
896    }
897
898    /// Peer that may contain the block
899    pub fn provider(mut self, peer_id: PeerId) -> Self {
900        self.instance = self.instance.provider(peer_id);
901        self
902    }
903
904    /// List of peers that may contain the block
905    pub fn providers(mut self, providers: impl IntoIterator<Item = impl Borrow<PeerId>>) -> Self {
906        self.instance = self.instance.providers(providers);
907        self
908    }
909}
910
911impl<S: RepoTypes> Future for RepoGetBlock<S> {
912    type Output = Result<Block, Error>;
913    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
914        let this = &mut self;
915        match futures::ready!(this.instance.poll_next_unpin(cx)) {
916            Some(result) => Poll::Ready(result),
917            None => Poll::Ready(Err(anyhow::anyhow!("block does not exist"))),
918        }
919    }
920}
921
922pub struct RepoGetBlocks<S: RepoTypes> {
923    repo: Option<Repo<S>>,
924    cids: IndexSet<Cid>,
925    providers: IndexSet<PeerId>,
926    local: bool,
927    span: Span,
928    timeout: Option<Duration>,
929    stream: Option<BoxStream<'static, Result<Block, Error>>>,
930}
931
932impl<S: RepoTypes> RepoGetBlocks<S> {
933    pub fn new(repo: Repo<S>) -> Self {
934        Self {
935            repo: Some(repo),
936            cids: IndexSet::new(),
937            providers: IndexSet::new(),
938            local: false,
939            span: Span::current(),
940            timeout: None,
941            stream: None,
942        }
943    }
944
945    pub fn blocks(mut self, cids: impl IntoIterator<Item = impl Borrow<Cid>>) -> Self {
946        self.cids.extend(cids.into_iter().map(|cid| *cid.borrow()));
947        self
948    }
949
950    pub fn block(self, cid: impl Borrow<Cid>) -> Self {
951        self.blocks([cid])
952    }
953
954    pub fn span(mut self, span: impl Borrow<Span>) -> Self {
955        let span = span.borrow();
956        self.span = span.clone();
957        self
958    }
959
960    pub fn timeout(mut self, timeout: impl Into<Option<Duration>>) -> Self {
961        self.timeout = timeout.into();
962        self
963    }
964
965    pub fn local(mut self) -> Self {
966        self.local = true;
967        self
968    }
969
970    pub fn set_local(mut self, local: bool) -> Self {
971        self.local = local;
972        self
973    }
974
975    pub fn provider(self, peer_id: impl Borrow<PeerId>) -> Self {
976        self.providers([peer_id])
977    }
978
979    pub fn providers(mut self, providers: impl IntoIterator<Item = impl Borrow<PeerId>>) -> Self {
980        self.providers
981            .extend(providers.into_iter().map(|k| *k.borrow()));
982        self
983    }
984}
985
986impl<S: RepoTypes> Stream for RepoGetBlocks<S> {
987    type Item = Result<Block, Error>;
988
989    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
990        if self.stream.is_none() && self.repo.is_none() {
991            return Poll::Ready(None);
992        }
993
994        let this = &mut *self;
995
996        loop {
997            match &mut this.stream {
998                Some(stream) => {
999                    let _g = this.span.enter();
1000                    match futures::ready!(stream.poll_next_unpin(cx)) {
1001                        Some(item) => return Poll::Ready(Some(item)),
1002                        None => {
1003                            this.stream.take();
1004                            return Poll::Ready(None);
1005                        }
1006                    }
1007                }
1008                None => {
1009                    let repo = this.repo.take().expect("valid repo instance");
1010                    let providers = std::mem::take(&mut this.providers);
1011                    let cids = std::mem::take(&mut this.cids);
1012                    let local_only = this.local;
1013                    let timeout = this.timeout;
1014
1015                    let st = async_stream::stream! {
1016                        let _guard = repo.gc_guard().await;
1017                        let mut missing: IndexSet<Cid> = cids.clone();
1018                        for cid in &cids {
1019                            if let Ok(Some(block)) = repo.get_block_now(cid).await {
1020                                yield Ok(block);
1021                                missing.shift_remove(cid);
1022                            }
1023                        }
1024
1025                        if missing.is_empty() {
1026                            return;
1027                        }
1028
1029                        if local_only || !repo.is_online() {
1030                            yield Err(anyhow::anyhow!("Unable to locate missing blocks {missing:?}"));
1031                            return;
1032                        }
1033
1034                        let mut events = match repo.repo_channel() {
1035                            Some(events) => events,
1036                            None => {
1037                                yield Err(anyhow::anyhow!("Channel is not available"));
1038                                return;
1039                            }
1040                        };
1041
1042                        let mut notified: HashMap<Cid, Vec<_>> = HashMap::new();
1043
1044                        let timeout = timeout.or(Some(Duration::from_secs(60)));
1045
1046                        let mut blocks = FuturesOrdered::new();
1047
1048                        for cid in &missing {
1049                            let cid = *cid;
1050                            let (tx, rx) = futures::channel::oneshot::channel();
1051                            repo.inner
1052                                .subscriptions
1053                                .lock()
1054                                .entry(cid)
1055                                .or_default()
1056                                .push(tx);
1057
1058                            let mut events = events.clone();
1059                            let signal = Arc::new(Notify::new());
1060                            let s2 = signal.clone();
1061                            let task = async move {
1062                                let block_fut = rx;
1063                                let notified_fut = signal.notified();
1064                                futures::pin_mut!(notified_fut);
1065
1066                                match futures::future::select(block_fut, notified_fut).await {
1067                                    Either::Left((Ok(Ok(block)), _)) => Ok::<_, Error>(block),
1068                                    Either::Left((Ok(Err(e)), _)) => Err::<_, Error>(anyhow::anyhow!("{e}")),
1069                                    Either::Left((Err(e), _)) => Err::<_, Error>(e.into()),
1070                                    Either::Right(((), _)) => {
1071                                        Err::<_, Error>(anyhow::anyhow!("request for {cid} has been cancelled"))
1072                                    }
1073                                }
1074                            }
1075                            .map_err(move |e| {
1076                                // Although the request would eventually be canceled if timeout or canceled, we can still signal to swarm
1077                                // about the block being unwanted for future changes.
1078                                _ = events.try_send(RepoEvent::UnwantBlock(cid));
1079                                e
1080                            })
1081                            .boxed();
1082
1083                            notified.entry(cid).or_default().push(s2);
1084
1085                            blocks.push_back(task);
1086                        }
1087
1088                        events
1089                            .send(RepoEvent::WantBlock(
1090                                Vec::from_iter(missing),
1091                                Vec::from_iter(providers),
1092                                timeout,
1093                                Some(notified),
1094                            ))
1095                            .await
1096                            .ok();
1097
1098                        for await block in blocks {
1099                            yield block
1100                        }
1101                    };
1102
1103                    this.stream.replace(Box::pin(st));
1104                }
1105            }
1106        }
1107    }
1108}
1109
1110impl<S: RepoTypes> IntoFuture for RepoGetBlocks<S> {
1111    type Output = Result<Vec<Block>, Error>;
1112    type IntoFuture = BoxFuture<'static, Self::Output>;
1113    fn into_future(self) -> Self::IntoFuture {
1114        async move {
1115            let col = self.try_collect().await?;
1116            Ok(col)
1117        }
1118        .boxed()
1119    }
1120}
1121
1122pub struct RepoPutBlock<S: RepoTypes> {
1123    repo: Repo<S>,
1124    block: Option<Block>,
1125    span: Option<Span>,
1126    broadcast_on_new_block: bool,
1127}
1128
1129impl<S: RepoTypes> RepoPutBlock<S> {
1130    fn new(repo: &Repo<S>, block: &Block) -> Self {
1131        let block = Some(block.clone());
1132        Self {
1133            repo: Repo::clone(repo),
1134            block,
1135            span: None,
1136            broadcast_on_new_block: true,
1137        }
1138    }
1139
1140    pub fn broadcast_on_new_block(mut self, v: bool) -> Self {
1141        self.broadcast_on_new_block = v;
1142        self
1143    }
1144
1145    pub fn span(mut self, span: Span) -> Self {
1146        self.span = Some(span);
1147        self
1148    }
1149}
1150
1151impl<S: RepoTypes> IntoFuture for RepoPutBlock<S> {
1152    type IntoFuture = BoxFuture<'static, Self::Output>;
1153    type Output = Result<Cid, Error>;
1154    fn into_future(mut self) -> Self::IntoFuture {
1155        let block = self.block.take().expect("valid block is set");
1156        let span = self.span.unwrap_or(Span::current());
1157        let span = debug_span!(parent: &span, "put_block", cid = %block.cid());
1158        async move {
1159            let _guard = self.repo.inner.gclock.read().await;
1160            let (cid, res) = self.repo.inner.block_store.put(&block).await?;
1161
1162            if let BlockPut::NewBlock = res {
1163                if self.broadcast_on_new_block {
1164                    if let Some(mut event) = self.repo.repo_channel() {
1165                        _ = event.send(RepoEvent::NewBlock(block.clone())).await;
1166                    }
1167                }
1168                let list = self.repo.inner.subscriptions.lock().remove(&cid);
1169                if let Some(mut list) = list {
1170                    for ch in list.drain(..) {
1171                        let block = block.clone();
1172                        let _ = ch.send(Ok(block));
1173                    }
1174                }
1175            }
1176
1177            Ok(cid)
1178        }
1179        .instrument(span)
1180        .boxed()
1181    }
1182}
1183
1184pub struct RepoFetch<S: RepoTypes> {
1185    repo: Repo<S>,
1186    cid: Cid,
1187    span: Option<Span>,
1188    providers: Vec<PeerId>,
1189    recursive: bool,
1190    timeout: Option<Duration>,
1191    refs: crate::refs::IpldRefs,
1192}
1193
1194impl<S: RepoTypes> RepoFetch<S> {
1195    pub fn new<C: Borrow<Cid>>(repo: Repo<S>, cid: C) -> Self {
1196        let cid = cid.borrow();
1197        Self {
1198            repo,
1199            cid: *cid,
1200            recursive: false,
1201            providers: vec![],
1202            timeout: None,
1203            refs: Default::default(),
1204            span: None,
1205        }
1206    }
1207
1208    /// Fetch blocks recursively
1209    pub fn recursive(mut self) -> Self {
1210        self.recursive = true;
1211        self
1212    }
1213
1214    /// Peer that may contain the block
1215    pub fn provider(mut self, peer_id: PeerId) -> Self {
1216        if !self.providers.contains(&peer_id) {
1217            self.providers.push(peer_id);
1218        }
1219        self
1220    }
1221
1222    /// List of peers that may contain the block
1223    pub fn providers(mut self, providers: &[PeerId]) -> Self {
1224        self.providers = providers.to_vec();
1225        self
1226    }
1227
1228    /// Fetch blocks to a specific depth
1229    pub fn depth(mut self, depth: u64) -> Self {
1230        self.refs = self.refs.with_max_depth(depth);
1231        self
1232    }
1233
1234    /// Duration to fetch the block from the network before
1235    /// timing out
1236    pub fn timeout(mut self, duration: Duration) -> Self {
1237        self.timeout.replace(duration);
1238        self.refs = self.refs.with_timeout(duration);
1239        self
1240    }
1241
1242    pub fn exit_on_error(mut self) -> Self {
1243        self.refs = self.refs.with_exit_on_error();
1244        self
1245    }
1246
1247    /// Set tracing span
1248    pub fn span(mut self, span: Span) -> Self {
1249        self.span = Some(span);
1250        self
1251    }
1252}
1253
1254impl<S: RepoTypes> IntoFuture for RepoFetch<S> {
1255    type Output = Result<(), Error>;
1256
1257    type IntoFuture = BoxFuture<'static, Self::Output>;
1258
1259    fn into_future(self) -> Self::IntoFuture {
1260        let cid = self.cid;
1261        let span = self.span.unwrap_or(Span::current());
1262        let recursive = self.recursive;
1263        let repo = self.repo;
1264        let span = debug_span!(parent: &span, "fetch", cid = %cid, recursive);
1265        let providers = self.providers;
1266        let timeout = self.timeout;
1267        async move {
1268            // Although getting a block adds a guard, we will add a read guard here a head of time so we can hold it throughout this future
1269            let _g = repo.inner.gclock.read().await;
1270            let block = repo
1271                .get_block(cid)
1272                .providers(&providers)
1273                .timeout(timeout)
1274                .await?;
1275
1276            if !recursive {
1277                return Ok(());
1278            }
1279            let ipld = block.to_ipld()?;
1280
1281            let mut st = self
1282                .refs
1283                .with_only_unique()
1284                .providers(&providers)
1285                .refs_of_resolved(&repo, vec![(cid, ipld.clone())])
1286                .map_ok(|crate::refs::Edge { destination, .. }| destination)
1287                .into_stream()
1288                .boxed();
1289
1290            while let Some(_c) = st.try_next().await? {}
1291
1292            Ok(())
1293        }
1294        .instrument(span)
1295        .boxed()
1296    }
1297}
1298
1299pub struct RepoInsertPin<S: RepoTypes> {
1300    repo: Repo<S>,
1301    cid: Cid,
1302    span: Option<Span>,
1303    providers: Vec<PeerId>,
1304    recursive: bool,
1305    timeout: Option<Duration>,
1306    local: bool,
1307    refs: crate::refs::IpldRefs,
1308}
1309
1310impl<S: RepoTypes> RepoInsertPin<S> {
1311    pub fn new<C: Borrow<Cid>>(repo: Repo<S>, cid: C) -> Self {
1312        let cid = cid.borrow();
1313        Self {
1314            repo,
1315            cid: *cid,
1316            recursive: false,
1317            providers: vec![],
1318            local: false,
1319            timeout: None,
1320            refs: Default::default(),
1321            span: None,
1322        }
1323    }
1324
1325    /// Recursively pin blocks
1326    pub fn recursive(mut self) -> Self {
1327        self.recursive = true;
1328        self
1329    }
1330
1331    /// Pin local blocks only
1332    pub fn local(mut self) -> Self {
1333        self.local = true;
1334        self.refs = self.refs.with_existing_blocks();
1335        self
1336    }
1337
1338    /// Set a flag to pin local blocks only
1339    pub fn set_local(mut self, local: bool) -> Self {
1340        self.local = local;
1341        if local {
1342            self.refs = self.refs.with_existing_blocks();
1343        }
1344        self
1345    }
1346
1347    /// Peer that may contain the block to pin
1348    pub fn provider(mut self, peer_id: PeerId) -> Self {
1349        if !self.providers.contains(&peer_id) {
1350            self.providers.push(peer_id);
1351        }
1352        self
1353    }
1354
1355    /// List of peers that may contain the block to pin
1356    pub fn providers(mut self, providers: &[PeerId]) -> Self {
1357        self.providers = providers.into();
1358        self
1359    }
1360
1361    /// Pin to a specific depth of the graph
1362    pub fn depth(mut self, depth: u64) -> Self {
1363        self.refs = self.refs.with_max_depth(depth);
1364        self
1365    }
1366
1367    /// Duration to fetch the block from the network before
1368    /// timing out
1369    pub fn timeout(mut self, duration: Duration) -> Self {
1370        self.timeout.replace(duration);
1371        self.refs = self.refs.with_timeout(duration);
1372        self
1373    }
1374
1375    pub fn exit_on_error(mut self) -> Self {
1376        self.refs = self.refs.with_exit_on_error();
1377        self
1378    }
1379
1380    /// Set tracing span
1381    pub fn span(mut self, span: Span) -> Self {
1382        self.span = Some(span);
1383        self
1384    }
1385}
1386
1387impl<S: RepoTypes> IntoFuture for RepoInsertPin<S> {
1388    type Output = Result<(), Error>;
1389
1390    type IntoFuture = BoxFuture<'static, Self::Output>;
1391
1392    fn into_future(self) -> Self::IntoFuture {
1393        let cid = self.cid;
1394        let local = self.local;
1395        let span = self.span.unwrap_or(Span::current());
1396        let recursive = self.recursive;
1397        let repo = self.repo;
1398        let span = debug_span!(parent: &span, "insert_pin", cid = %cid, recursive);
1399        let providers = self.providers;
1400        let timeout = self.timeout;
1401        async move {
1402            // Although getting a block adds a guard, we will add a read guard here a head of time so we can hold it throughout this future
1403            let _g = repo.inner.gclock.read().await;
1404            let block = repo
1405                .get_block(cid)
1406                .providers(&providers)
1407                .set_local(local)
1408                .timeout(timeout)
1409                .await?;
1410
1411            if !recursive {
1412                repo.insert_direct_pin(&cid).await?
1413            } else {
1414                let ipld = block.to_ipld()?;
1415
1416                let st = self
1417                    .refs
1418                    .with_only_unique()
1419                    .providers(&providers)
1420                    .refs_of_resolved(&repo, vec![(cid, ipld.clone())])
1421                    .map_ok(|crate::refs::Edge { destination, .. }| destination)
1422                    .into_stream()
1423                    .boxed();
1424
1425                repo.insert_recursive_pin(&cid, st).await?
1426            }
1427            Ok(())
1428        }
1429        .instrument(span)
1430        .boxed()
1431    }
1432}
1433
1434pub struct RepoRemovePin<S: RepoTypes> {
1435    repo: Repo<S>,
1436    cid: Cid,
1437    span: Option<Span>,
1438    recursive: bool,
1439    refs: crate::refs::IpldRefs,
1440}
1441
1442impl<S: RepoTypes> RepoRemovePin<S> {
1443    pub fn new<C: Borrow<Cid>>(repo: Repo<S>, cid: C) -> Self {
1444        let cid = cid.borrow();
1445        Self {
1446            repo,
1447            cid: *cid,
1448            recursive: false,
1449            refs: Default::default(),
1450            span: None,
1451        }
1452    }
1453
1454    /// Recursively unpin blocks
1455    pub fn recursive(mut self) -> Self {
1456        self.recursive = true;
1457        self
1458    }
1459
1460    /// Set tracing span
1461    pub fn span(mut self, span: Span) -> Self {
1462        self.span = Some(span);
1463        self
1464    }
1465}
1466
1467impl<S: RepoTypes> IntoFuture for RepoRemovePin<S> {
1468    type Output = Result<(), Error>;
1469
1470    type IntoFuture = BoxFuture<'static, Self::Output>;
1471
1472    fn into_future(self) -> Self::IntoFuture {
1473        let cid = self.cid;
1474        let span = self.span.unwrap_or(Span::current());
1475        let recursive = self.recursive;
1476        let repo = self.repo;
1477
1478        let span = debug_span!(parent: &span, "remove_pin", cid = %cid, recursive);
1479        async move {
1480            let _g = repo.inner.gclock.read().await;
1481            if !recursive {
1482                repo.remove_direct_pin(&cid).await
1483            } else {
1484                // start walking refs of the root after loading it
1485
1486                let block = match repo.get_block_now(&cid).await? {
1487                    Some(b) => b,
1488                    None => {
1489                        return Err(anyhow::anyhow!("pinned root not found: {}", cid));
1490                    }
1491                };
1492
1493                let ipld = block.to_ipld()?;
1494                let st = self
1495                    .refs
1496                    .with_only_unique()
1497                    .with_existing_blocks()
1498                    .refs_of_resolved(&repo, vec![(cid, ipld)])
1499                    .map_ok(|crate::refs::Edge { destination, .. }| destination)
1500                    .into_stream()
1501                    .boxed();
1502
1503                repo.remove_recursive_pin(&cid, st).await
1504            }
1505        }
1506        .instrument(span)
1507        .boxed()
1508    }
1509}