rust_ipfs/repo/
mod.rs

1//! Storage implementation(s) backing the [`crate::Ipfs`].
2use crate::error::Error;
3use crate::{Block, StorageType};
4use async_trait::async_trait;
5use core::fmt::Debug;
6use futures::channel::mpsc::{channel, Receiver, Sender};
7use futures::future::{BoxFuture, Either};
8use futures::sink::SinkExt;
9use futures::stream::{self, BoxStream, FuturesOrdered};
10use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
11use indexmap::IndexSet;
12use ipld_core::cid::Cid;
13use libp2p::identity::PeerId;
14use parking_lot::{Mutex, RwLock};
15use std::borrow::Borrow;
16use std::collections::{BTreeSet, HashMap};
17use std::future::{Future, IntoFuture};
18#[allow(unused_imports)]
19use std::path::Path;
20use std::pin::Pin;
21use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22use std::sync::Arc;
23use std::task::{Context, Poll};
24use std::time::Duration;
25use std::{error, fmt, io};
26use tokio::sync::{Notify, RwLockReadGuard};
27use tracing::{log, Instrument, Span};
28
29#[macro_use]
30#[cfg(test)]
31mod common_tests;
32
33pub mod blockstore;
34pub mod datastore;
35pub mod lock;
36
37/// Path mangling done for pins and blocks
38#[cfg(not(target_arch = "wasm32"))]
39pub(crate) mod paths;
40
41/// Describes the outcome of `BlockStore::put_block`.
42#[derive(Debug, PartialEq, Eq)]
43pub enum BlockPut {
44    /// A new block was written to the blockstore.
45    NewBlock,
46    /// The block already exists.
47    Existed,
48}
49
50/// Describes the outcome of `BlockStore::remove`.
51#[derive(Debug)]
52pub enum BlockRm {
53    /// A block was successfully removed from the blockstore.
54    Removed(Cid),
55    // TODO: DownloadCancelled(Cid, Duration),
56}
57
58// pub struct BlockNotFound(Cid);
59/// Describes the error variants for `BlockStore::remove`.
60#[derive(Debug)]
61pub enum BlockRmError {
62    // TODO: Pinned(Cid),
63    /// The `Cid` doesn't correspond to a block in the blockstore.
64    NotFound(Cid),
65}
66
67/// This API is being discussed and evolved, which will likely lead to breakage.
68#[async_trait]
69pub trait BlockStore: Debug + Send + Sync {
70    async fn init(&self) -> Result<(), Error>;
71
72    #[deprecated]
73    async fn open(&self) -> Result<(), Error> {
74        Ok(())
75    }
76    /// Returns whether a block is present in the blockstore.
77    async fn contains(&self, cid: &Cid) -> Result<bool, Error>;
78    /// Returns a block from the blockstore.
79    async fn get(&self, cid: &Cid) -> Result<Option<Block>, Error>;
80    /// Get the size of a single block
81    async fn size(&self, cid: &[Cid]) -> Result<Option<usize>, Error>;
82    /// Get a total size of the block store
83    async fn total_size(&self) -> Result<usize, Error>;
84    /// Inserts a block in the blockstore.
85    async fn put(&self, block: &Block) -> Result<(Cid, BlockPut), Error>;
86    /// Removes a block from the blockstore.
87    async fn remove(&self, cid: &Cid) -> Result<(), Error>;
88    /// Remove multiple blocks from the blockstore
89    async fn remove_many(&self, blocks: BoxStream<'static, Cid>) -> BoxStream<'static, Cid>;
90    /// Returns a list of the blocks (Cids), in the blockstore.
91    async fn list(&self) -> BoxStream<'static, Cid>;
92}
93
94#[async_trait]
95/// Generic layer of abstraction for a key-value data store.
96pub trait DataStore: PinStore + Debug + Send + Sync {
97    async fn init(&self) -> Result<(), Error>;
98    #[deprecated]
99    async fn open(&self) -> Result<(), Error> {
100        Ok(())
101    }
102    /// Checks if a key is present in the datastore.
103    async fn contains(&self, key: &[u8]) -> Result<bool, Error>;
104    /// Returns the value associated with a key from the datastore.
105    async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error>;
106    /// Puts the value under the key in the datastore.
107    async fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error>;
108    /// Removes a key-value pair from the datastore.
109    async fn remove(&self, key: &[u8]) -> Result<(), Error>;
110    /// Iterate over the k/v of the datastore
111    async fn iter(&self) -> futures::stream::BoxStream<'static, (Vec<u8>, Vec<u8>)>;
112}
113
114#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
115pub struct GCConfig {
116    /// How long until GC runs
117    /// If duration is not set, it will not run at a timer
118    pub duration: Duration,
119
120    /// What will trigger GC
121    pub trigger: GCTrigger,
122}
123
124impl Default for GCConfig {
125    fn default() -> Self {
126        Self {
127            duration: Duration::from_secs(60 * 60),
128            trigger: GCTrigger::default(),
129        }
130    }
131}
132
133#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord)]
134pub enum GCTrigger {
135    /// At a specific size. If the size is at or exceeds, it will trigger GC
136    At {
137        size: usize,
138    },
139
140    AtStorage,
141
142    /// No trigger
143    #[default]
144    None,
145}
146
147/// Errors variants describing the possible failures for `Lock::try_exclusive`.
148#[derive(Debug)]
149pub enum LockError {
150    RepoInUse,
151    LockFileOpenFailed(io::Error),
152}
153
154impl fmt::Display for LockError {
155    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
156        let msg = match self {
157            LockError::RepoInUse => "The repository is already being used by an IPFS instance.",
158            LockError::LockFileOpenFailed(_) => "Failed to open repository lock file.",
159        };
160
161        write!(f, "{msg}")
162    }
163}
164
165impl From<io::Error> for LockError {
166    fn from(error: io::Error) -> Self {
167        match error.kind() {
168            // `WouldBlock` is not used by `OpenOptions` (this could change), and can therefore be
169            // matched on for the fs2 error in `FsLock::try_exclusive`.
170            io::ErrorKind::WouldBlock => LockError::RepoInUse,
171            _ => LockError::LockFileOpenFailed(error),
172        }
173    }
174}
175
176impl error::Error for LockError {
177    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
178        if let Self::LockFileOpenFailed(error) = self {
179            Some(error)
180        } else {
181            None
182        }
183    }
184}
185
186/// A trait for describing repository locking.
187///
188/// This ensures no two IPFS nodes can be started with the same peer ID, as exclusive access to the
189/// repository is guarenteed. This is most useful when using an fs backed repo.
190pub trait Lock: Debug + Send + Sync {
191    // fn new(path: PathBuf) -> Self;
192    fn try_exclusive(&self) -> Result<(), LockError>;
193}
194
195type References<'a> = BoxStream<'a, Result<Cid, crate::refs::IpldRefsError>>;
196
197#[async_trait]
198pub trait PinStore: Debug + Send + Sync {
199    async fn is_pinned(&self, block: &Cid) -> Result<bool, Error>;
200
201    async fn insert_direct_pin(&self, target: &Cid) -> Result<(), Error>;
202
203    async fn insert_recursive_pin(
204        &self,
205        target: &Cid,
206        referenced: References<'_>,
207    ) -> Result<(), Error>;
208
209    async fn remove_direct_pin(&self, target: &Cid) -> Result<(), Error>;
210
211    async fn remove_recursive_pin(
212        &self,
213        target: &Cid,
214        referenced: References<'_>,
215    ) -> Result<(), Error>;
216
217    async fn list(
218        &self,
219        mode: Option<PinMode>,
220    ) -> futures::stream::BoxStream<'static, Result<(Cid, PinMode), Error>>;
221
222    // here we should have resolved ids
223    // go-ipfs: doesnt start fetching the paths
224    // js-ipfs: starts fetching paths
225    // FIXME: there should probably be an additional Result<$inner, Error> here; the per pin error
226    // is serde OR cid::Error.
227    /// Returns error if any of the ids isn't pinned in the required type, otherwise returns
228    /// the pin details if all of the cids are pinned in one way or the another.
229    async fn query(
230        &self,
231        ids: Vec<Cid>,
232        requirement: Option<PinMode>,
233    ) -> Result<Vec<(Cid, PinKind<Cid>)>, Error>;
234}
235
236/// `PinMode` is the description of pin type for quering purposes.
237#[derive(Debug, PartialEq, Eq, Clone, Copy)]
238pub enum PinMode {
239    Indirect,
240    Direct,
241    Recursive,
242}
243
244/// Helper for around the quite confusing test required in [`PinStore::list`] and
245/// [`PinStore::query`].
246#[derive(Debug, Clone, Copy)]
247enum PinModeRequirement {
248    Only(PinMode),
249    Any,
250}
251
252impl From<Option<PinMode>> for PinModeRequirement {
253    fn from(filter: Option<PinMode>) -> Self {
254        match filter {
255            Some(one) => PinModeRequirement::Only(one),
256            None => PinModeRequirement::Any,
257        }
258    }
259}
260
261#[allow(dead_code)]
262impl PinModeRequirement {
263    fn is_indirect_or_any(&self) -> bool {
264        use PinModeRequirement::*;
265        match self {
266            Only(PinMode::Indirect) | Any => true,
267            Only(_) => false,
268        }
269    }
270
271    fn matches<P: PartialEq<PinMode>>(&self, other: &P) -> bool {
272        use PinModeRequirement::*;
273        match self {
274            Only(one) if other == one => true,
275            Only(_) => false,
276            Any => true,
277        }
278    }
279
280    fn required(&self) -> Option<PinMode> {
281        use PinModeRequirement::*;
282        match self {
283            Only(one) => Some(*one),
284            Any => None,
285        }
286    }
287}
288
289impl<B: Borrow<Cid>> PartialEq<PinMode> for PinKind<B> {
290    fn eq(&self, other: &PinMode) -> bool {
291        matches!(
292            (self, other),
293            (PinKind::IndirectFrom(_), PinMode::Indirect)
294                | (PinKind::Direct, PinMode::Direct)
295                | (PinKind::Recursive(_), PinMode::Recursive)
296                | (PinKind::RecursiveIntention, PinMode::Recursive)
297        )
298    }
299}
300
301/// `PinKind` is more specific pin description for writing purposes. Implements
302/// `PartialEq<&PinMode>`. Generic over `Borrow<Cid>` to allow storing both reference and owned
303/// value of Cid.
304#[derive(Debug, PartialEq, Eq)]
305pub enum PinKind<C: Borrow<Cid>> {
306    IndirectFrom(C),
307    Direct,
308    Recursive(u64),
309    RecursiveIntention,
310}
311
312impl<C: Borrow<Cid>> PinKind<C> {
313    fn as_ref(&self) -> PinKind<&'_ Cid> {
314        match self {
315            PinKind::IndirectFrom(c) => PinKind::IndirectFrom(c.borrow()),
316            PinKind::Direct => PinKind::Direct,
317            PinKind::Recursive(count) => PinKind::Recursive(*count),
318            PinKind::RecursiveIntention => PinKind::RecursiveIntention,
319        }
320    }
321}
322
323type SubscriptionsMap = HashMap<Cid, Vec<futures::channel::oneshot::Sender<Result<Block, String>>>>;
324
325/// Describes a repo.
326/// Consolidates a blockstore, a datastore and a subscription registry.
327#[allow(clippy::type_complexity)]
328#[derive(Debug, Clone)]
329pub struct Repo {
330    pub(crate) inner: Arc<RepoInner>,
331}
332
333#[derive(Debug)]
334pub(crate) struct RepoInner {
335    online: AtomicBool,
336    initialized: AtomicBool,
337    max_storage_size: AtomicUsize,
338    block_store: Box<dyn BlockStore>,
339    data_store: Box<dyn DataStore>,
340    events: RwLock<Option<Sender<RepoEvent>>>,
341    pub(crate) subscriptions: Mutex<SubscriptionsMap>,
342    lockfile: Box<dyn Lock>,
343    pub(crate) gclock: tokio::sync::RwLock<()>,
344}
345
346/// Events used to communicate to the swarm on repo changes.
347#[derive(Debug)]
348pub enum RepoEvent {
349    /// Signals a desired block.
350    WantBlock(
351        Vec<Cid>,
352        Vec<PeerId>,
353        Option<Duration>,
354        Option<HashMap<Cid, Vec<Arc<Notify>>>>,
355    ),
356    /// Signals a desired block is no longer wanted.
357    UnwantBlock(Cid),
358    /// Signals the posession of a new block.
359    NewBlock(Block),
360    /// Signals the removal of a block.
361    RemovedBlock(Cid),
362}
363
364impl Repo {
365    pub fn new(repo_type: &mut StorageType) -> Self {
366        match repo_type {
367            StorageType::Memory => Repo::new_memory(),
368            #[cfg(not(target_arch = "wasm32"))]
369            StorageType::Disk(path) => Repo::new_fs(path),
370            #[cfg(target_arch = "wasm32")]
371            StorageType::IndexedDb { namespace } => Repo::new_idb(namespace.take()),
372            StorageType::Custom {
373                blockstore,
374                datastore,
375                lock,
376            } => Repo::new_raw(
377                blockstore.take().expect("Requires blockstore"),
378                datastore.take().expect("Requires datastore"),
379                lock.take()
380                    .expect("Requires lockfile for data and block store"),
381            ),
382        }
383    }
384
385    pub fn new_raw(
386        block_store: Box<dyn BlockStore>,
387        data_store: Box<dyn DataStore>,
388        lockfile: Box<dyn Lock>,
389    ) -> Self {
390        let inner = RepoInner {
391            initialized: AtomicBool::default(),
392            online: AtomicBool::default(),
393            block_store,
394            data_store,
395            events: Default::default(),
396            subscriptions: Default::default(),
397            lockfile,
398            max_storage_size: Default::default(),
399            gclock: Default::default(),
400        };
401        Repo {
402            inner: Arc::new(inner),
403        }
404    }
405
406    #[cfg(not(target_arch = "wasm32"))]
407    pub fn new_fs(path: impl AsRef<Path>) -> Self {
408        let path = path.as_ref().to_path_buf();
409        let mut blockstore_path = path.clone();
410        let mut datastore_path = path.clone();
411        let mut lockfile_path = path;
412        blockstore_path.push("blockstore");
413        datastore_path.push("datastore");
414        lockfile_path.push("repo_lock");
415
416        let block_store = Box::new(blockstore::flatfs::FsBlockStore::new(blockstore_path));
417        let data_store = Box::new(datastore::flatfs::FsDataStore::new(datastore_path));
418        let lockfile = Box::new(lock::FsLock::new(lockfile_path));
419        Self::new_raw(block_store, data_store, lockfile)
420    }
421
422    pub fn new_memory() -> Self {
423        let block_store = Box::new(blockstore::memory::MemBlockStore::new(Default::default()));
424        let data_store = Box::new(datastore::memory::MemDataStore::new(Default::default()));
425        let lockfile = Box::new(lock::MemLock);
426        Self::new_raw(block_store, data_store, lockfile)
427    }
428
429    #[cfg(target_arch = "wasm32")]
430    pub fn new_idb(namespace: Option<String>) -> Self {
431        let block_store = Box::new(blockstore::idb::IdbBlockStore::new(namespace.clone()));
432        let data_store = Box::new(datastore::idb::IdbDataStore::new(namespace));
433        let lockfile = Box::new(lock::MemLock);
434        Self::new_raw(block_store, data_store, lockfile)
435    }
436
437    pub fn set_max_storage_size(&self, size: usize) {
438        self.inner.max_storage_size.store(size, Ordering::SeqCst);
439    }
440
441    pub fn max_storage_size(&self) -> usize {
442        self.inner.max_storage_size.load(Ordering::SeqCst)
443    }
444
445    pub async fn migrate(&self, repo: &Self) -> Result<(), Error> {
446        if self.is_online() || repo.is_online() {
447            anyhow::bail!("Repository cannot be online");
448        }
449        let block_migration = {
450            async move {
451                let mut stream = self.list_blocks().await;
452                while let Some(cid) = stream.next().await {
453                    match self.get_block_now(&cid).await {
454                        Ok(Some(block)) => match repo.inner.block_store.put(&block).await {
455                            Ok(_) => {}
456                            Err(e) => error!("Error migrating {cid}: {e}"),
457                        },
458                        Ok(None) => error!("{cid} doesnt exist"),
459                        Err(e) => error!("Error getting block {cid}: {e}"),
460                    }
461                }
462            }
463        };
464
465        let data_migration = {
466            async move {
467                let mut data_stream = self.data_store().iter().await;
468                while let Some((k, v)) = data_stream.next().await {
469                    if let Err(e) = repo.data_store().put(&k, &v).await {
470                        error!("Unable to migrate {k:?} into repo: {e}");
471                    }
472                }
473            }
474        };
475
476        let pins_migration = {
477            async move {
478                let mut stream = self.data_store().list(None).await;
479                while let Some(Ok((cid, pin_mode))) = stream.next().await {
480                    match pin_mode {
481                        PinMode::Direct => match repo.data_store().insert_direct_pin(&cid).await {
482                            Ok(_) => {}
483                            Err(e) => error!("Unable to migrate pin {cid}: {e}"),
484                        },
485                        PinMode::Indirect => {
486                            //No need to track since we will be obtaining the reference from the pin that is recursive
487                            continue;
488                        }
489                        PinMode::Recursive => {
490                            let block = match self
491                                .get_block_now(&cid)
492                                .await
493                                .map(|block| block.and_then(|block| block.to_ipld().ok()))
494                            {
495                                Ok(Some(block)) => block,
496                                Ok(None) => continue,
497                                Err(e) => {
498                                    error!("Block {cid} does not exist but is pinned: {e}");
499                                    continue;
500                                }
501                            };
502
503                            let st = crate::refs::IpldRefs::default()
504                                .with_only_unique()
505                                .refs_of_resolved(self, vec![(cid, block.clone())].into_iter())
506                                .map_ok(|crate::refs::Edge { destination, .. }| destination)
507                                .into_stream()
508                                .boxed();
509
510                            if let Err(e) = repo.insert_recursive_pin(&cid, st).await {
511                                error!("Error migrating pin {cid}: {e}");
512                                continue;
513                            }
514                        }
515                    }
516                }
517            }
518        };
519
520        futures::join!(block_migration, data_migration, pins_migration);
521
522        Ok(())
523    }
524
525    pub(crate) fn initialize_channel(&self) -> Receiver<RepoEvent> {
526        let mut event_guard = self.inner.events.write();
527        let (sender, receiver) = channel(1);
528        debug_assert!(event_guard.is_none());
529        *event_guard = Some(sender);
530        self.set_online();
531        receiver
532    }
533
534    /// Shutdowns the repo, cancelling any pending subscriptions; Likely going away after some
535    /// refactoring, see notes on [`crate::Ipfs::exit_daemon`].
536    pub fn shutdown(&self) {
537        let mut map = self.inner.subscriptions.lock();
538        map.clear();
539        drop(map);
540        if let Some(mut event) = self.inner.events.write().take() {
541            event.close_channel()
542        }
543        self.set_offline();
544    }
545
546    pub fn is_online(&self) -> bool {
547        self.inner.online.load(Ordering::SeqCst)
548    }
549
550    pub(crate) fn set_online(&self) {
551        if self.is_online() {
552            return;
553        }
554
555        self.inner.online.store(true, Ordering::SeqCst)
556    }
557
558    pub(crate) fn set_offline(&self) {
559        if !self.is_online() {
560            return;
561        }
562
563        self.inner.online.store(false, Ordering::SeqCst)
564    }
565
566    fn repo_channel(&self) -> Option<Sender<RepoEvent>> {
567        self.inner.events.read().clone()
568    }
569
570    pub async fn init(&self) -> Result<(), Error> {
571        //Avoid initializing again
572        if self.inner.initialized.load(Ordering::SeqCst) {
573            return Ok(());
574        }
575        // Dropping the guard (even though not strictly necessary to compile) to avoid potential
576        // deadlocks if `block_store` or `data_store` were to try to access `Repo.lockfile`.
577        {
578            log::debug!("Trying lockfile");
579            self.inner.lockfile.try_exclusive()?;
580            log::debug!("lockfile tried");
581        }
582
583        self.inner.block_store.init().await?;
584        self.inner.data_store.init().await?;
585        self.inner.initialized.store(true, Ordering::SeqCst);
586        Ok(())
587    }
588
589    /// Puts a block into the block store.
590    pub fn put_block<'a>(&self, block: &'a Block) -> RepoPutBlock<'a> {
591        RepoPutBlock::new(self, block).broadcast_on_new_block(true)
592    }
593
594    /// Retrives a block from the block store, or starts fetching it from the network and awaits
595    /// until it has been fetched.
596    #[inline]
597    pub fn get_block<C: Borrow<Cid>>(&self, cid: C) -> RepoGetBlock {
598        RepoGetBlock::new(self.clone(), cid)
599    }
600
601    /// Retrives a set of blocks from the block store, or starts fetching them from the network and awaits
602    /// until it has been fetched.
603    #[inline]
604    pub fn get_blocks(&self, cids: impl IntoIterator<Item = impl Borrow<Cid>>) -> RepoGetBlocks {
605        RepoGetBlocks::new(self.clone()).blocks(cids)
606    }
607
608    /// Get the size of listed blocks
609    #[inline]
610    pub async fn get_blocks_size(&self, cids: &[Cid]) -> Result<Option<usize>, Error> {
611        self.inner.block_store.size(cids).await
612    }
613
614    /// Get the total size of the block store
615    #[inline]
616    pub async fn get_total_size(&self) -> Result<usize, Error> {
617        self.inner.block_store.total_size().await
618    }
619
620    /// Retrieves a block from the block store if it's available locally.
621    pub async fn get_block_now<C: Borrow<Cid>>(&self, cid: C) -> Result<Option<Block>, Error> {
622        let cid = cid.borrow();
623        self.inner.block_store.get(cid).await
624    }
625
626    /// Check to determine if blockstore contain a block
627    pub async fn contains<C: Borrow<Cid>>(&self, cid: C) -> Result<bool, Error> {
628        let cid = cid.borrow();
629        self.inner.block_store.contains(cid).await
630    }
631
632    /// Lists the blocks in the blockstore.
633    pub async fn list_blocks(&self) -> BoxStream<'static, Cid> {
634        self.inner.block_store.list().await
635    }
636
637    /// Remove block from the block store.
638    pub async fn remove_block<C: Borrow<Cid>>(
639        &self,
640        cid: C,
641        recursive: bool,
642    ) -> Result<Vec<Cid>, Error> {
643        let _guard = self.inner.gclock.read().await;
644        let cid = cid.borrow();
645        if self.is_pinned(cid).await? {
646            return Err(anyhow::anyhow!("block to remove is pinned"));
647        }
648
649        let list = match recursive {
650            true => {
651                let mut list = self.recursive_collections(*cid).await;
652                // ensure the first root block is apart of the list
653                list.insert(*cid);
654                list
655            }
656            false => BTreeSet::from_iter(std::iter::once(*cid)),
657        };
658
659        let list = stream::iter(
660            FuturesOrdered::from_iter(list.into_iter().map(|cid| async move { cid }))
661                .filter_map(|cid| async move {
662                    (!self.is_pinned(&cid).await.unwrap_or_default()).then_some(cid)
663                })
664                .collect::<Vec<Cid>>()
665                .await,
666        )
667        .boxed();
668
669        let removed = self
670            .inner
671            .block_store
672            .remove_many(list)
673            .await
674            .collect::<Vec<_>>()
675            .await;
676
677        for cid in &removed {
678            // notify ipfs task about the removed blocks
679            if let Some(mut events) = self.repo_channel() {
680                let _ = events.send(RepoEvent::RemovedBlock(*cid)).await;
681            }
682        }
683        Ok(removed)
684    }
685
686    fn recursive_collections(&self, cid: Cid) -> BoxFuture<'_, BTreeSet<Cid>> {
687        async move {
688            let block = match self.get_block_now(&cid).await {
689                Ok(Some(block)) => block,
690                _ => return BTreeSet::default(),
691            };
692
693            let mut references: BTreeSet<Cid> = BTreeSet::new();
694            if block.references(&mut references).is_err() {
695                return BTreeSet::default();
696            }
697
698            let mut list = BTreeSet::new();
699
700            for cid in &references {
701                let mut inner_list = self.recursive_collections(*cid).await;
702                list.append(&mut inner_list);
703            }
704
705            references.append(&mut list);
706            references
707        }
708        .boxed()
709    }
710
711    /// Pins a given Cid recursively or directly (non-recursively).
712    ///
713    /// Pins on a block are additive in sense that a previously directly (non-recursively) pinned
714    /// can be made recursive, but removing the recursive pin on the block removes also the direct
715    /// pin as well.
716    ///
717    /// Pinning a Cid recursively (for supported dag-protobuf and dag-cbor) will walk its
718    /// references and pin the references indirectly. When a Cid is pinned indirectly it will keep
719    /// its previous direct or recursive pin and be indirect in addition.
720    ///
721    /// Recursively pinned Cids cannot be re-pinned non-recursively but non-recursively pinned Cids
722    /// can be "upgraded to" being recursively pinned.
723    pub fn pin<C: Borrow<Cid>>(&self, cid: C) -> RepoInsertPin {
724        RepoInsertPin::new(self.clone(), cid)
725    }
726
727    /// Unpins a given Cid recursively or only directly.
728    ///
729    /// Recursively unpinning a previously only directly pinned Cid will remove the direct pin.
730    ///
731    /// Unpinning an indirectly pinned Cid is not possible other than through its recursively
732    /// pinned tree roots.
733    pub fn remove_pin<C: Borrow<Cid>>(&self, cid: C) -> RepoRemovePin {
734        RepoRemovePin::new(self.clone(), cid)
735    }
736
737    pub fn fetch<C: Borrow<Cid>>(&self, cid: C) -> RepoFetch {
738        RepoFetch::new(self.clone(), cid)
739    }
740
741    /// Pins a given Cid recursively or directly (non-recursively).
742    pub(crate) async fn insert_pin(
743        &self,
744        cid: &Cid,
745        recursive: bool,
746        local_only: bool,
747    ) -> Result<(), Error> {
748        let mut pin_fut = self.pin(cid);
749        if recursive {
750            pin_fut = pin_fut.recursive();
751        }
752        if local_only {
753            pin_fut = pin_fut.local();
754        }
755        pin_fut.await
756    }
757
758    /// Inserts a direct pin for a `Cid`.
759    pub(crate) async fn insert_direct_pin(&self, cid: &Cid) -> Result<(), Error> {
760        self.inner.data_store.insert_direct_pin(cid).await
761    }
762
763    /// Inserts a recursive pin for a `Cid`.
764    pub(crate) async fn insert_recursive_pin(
765        &self,
766        cid: &Cid,
767        refs: References<'_>,
768    ) -> Result<(), Error> {
769        self.inner.data_store.insert_recursive_pin(cid, refs).await
770    }
771
772    /// Removes a direct pin for a `Cid`.
773    pub(crate) async fn remove_direct_pin(&self, cid: &Cid) -> Result<(), Error> {
774        self.inner.data_store.remove_direct_pin(cid).await
775    }
776
777    /// Removes a recursive pin for a `Cid`.
778    pub(crate) async fn remove_recursive_pin(
779        &self,
780        cid: &Cid,
781        refs: References<'_>,
782    ) -> Result<(), Error> {
783        // FIXME: not really sure why is there not an easier way to to transfer control
784        self.inner.data_store.remove_recursive_pin(cid, refs).await
785    }
786
787    /// Function to perform a basic cleanup of unpinned blocks
788    pub(crate) async fn cleanup(&self) -> Result<Vec<Cid>, Error> {
789        let repo = self.clone();
790
791        let blocks = repo.list_blocks().await;
792        let pins = repo
793            .list_pins(None)
794            .await
795            .filter_map(|result| futures::future::ready(result.map(|(cid, _)| cid).ok()))
796            .collect::<Vec<_>>()
797            .await;
798
799        let stream = async_stream::stream! {
800            for await cid in blocks {
801                if pins.contains(&cid) {
802                    continue;
803                }
804                yield cid;
805            }
806        }
807        .boxed();
808
809        let removed_blocks = self
810            .inner
811            .block_store
812            .remove_many(stream)
813            .await
814            .collect::<Vec<_>>()
815            .await;
816
817        Ok(removed_blocks)
818    }
819
820    /// Checks if a `Cid` is pinned.
821    pub async fn is_pinned<C: Borrow<Cid>>(&self, cid: C) -> Result<bool, Error> {
822        let cid = cid.borrow();
823        self.inner.data_store.is_pinned(cid).await
824    }
825
826    pub async fn list_pins(
827        &self,
828        mode: impl Into<Option<PinMode>>,
829    ) -> BoxStream<'static, Result<(Cid, PinMode), Error>> {
830        let mode = mode.into();
831        self.inner.data_store.list(mode).await
832    }
833
834    pub async fn query_pins(
835        &self,
836        cids: Vec<Cid>,
837        requirement: impl Into<Option<PinMode>>,
838    ) -> Result<Vec<(Cid, PinKind<Cid>)>, Error> {
839        let requirement = requirement.into();
840        self.inner.data_store.query(cids, requirement).await
841    }
842}
843
844pub struct GCGuard<'a> {
845    _g: RwLockReadGuard<'a, ()>,
846}
847
848impl Repo {
849    /// Hold a guard to prevent GC from running until this guard has dropped
850    /// Note: Until this guard drops, the GC task, if enabled, would not perform any cleanup.
851    ///       If the GC task is running, this guard will await until GC finishes
852    pub async fn gc_guard(&self) -> GCGuard {
853        let _g = self.inner.gclock.read().await;
854        GCGuard { _g }
855    }
856
857    pub fn data_store(&self) -> &dyn DataStore {
858        &*self.inner.data_store
859    }
860}
861
862pub struct RepoGetBlock {
863    instance: RepoGetBlocks,
864}
865
866impl RepoGetBlock {
867    pub fn new<C: Borrow<Cid>>(repo: Repo, cid: C) -> Self {
868        let instance = RepoGetBlocks::new(repo).block(cid);
869        Self { instance }
870    }
871
872    pub fn span<S: Borrow<Span>>(mut self, span: S) -> Self {
873        self.instance = self.instance.span(span);
874        self
875    }
876
877    pub fn timeout(mut self, timeout: impl Into<Option<Duration>>) -> Self {
878        self.instance = self.instance.timeout(timeout);
879        self
880    }
881
882    pub fn local(mut self) -> Self {
883        self.instance = self.instance.local();
884        self
885    }
886
887    pub fn set_local(mut self, local: bool) -> Self {
888        self.instance = self.instance.set_local(local);
889        self
890    }
891
892    /// Peer that may contain the block
893    pub fn provider(mut self, peer_id: PeerId) -> Self {
894        self.instance = self.instance.provider(peer_id);
895        self
896    }
897
898    /// List of peers that may contain the block
899    pub fn providers(mut self, providers: impl IntoIterator<Item = impl Borrow<PeerId>>) -> Self {
900        self.instance = self.instance.providers(providers);
901        self
902    }
903}
904
905impl Future for RepoGetBlock {
906    type Output = Result<Block, Error>;
907    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
908        let this = &mut self;
909        match futures::ready!(this.instance.poll_next_unpin(cx)) {
910            Some(result) => Poll::Ready(result),
911            None => Poll::Ready(Err(anyhow::anyhow!("block does not exist"))),
912        }
913    }
914}
915
916pub struct RepoGetBlocks {
917    repo: Option<Repo>,
918    cids: IndexSet<Cid>,
919    providers: IndexSet<PeerId>,
920    local: bool,
921    span: Span,
922    timeout: Option<Duration>,
923    stream: Option<BoxStream<'static, Result<Block, Error>>>,
924}
925
926impl RepoGetBlocks {
927    pub fn new(repo: Repo) -> Self {
928        Self {
929            repo: Some(repo),
930            cids: IndexSet::new(),
931            providers: IndexSet::new(),
932            local: false,
933            span: Span::current(),
934            timeout: None,
935            stream: None,
936        }
937    }
938
939    pub fn blocks(mut self, cids: impl IntoIterator<Item = impl Borrow<Cid>>) -> Self {
940        self.cids.extend(cids.into_iter().map(|cid| *cid.borrow()));
941        self
942    }
943
944    pub fn block<C: Borrow<Cid>>(self, cid: C) -> Self {
945        self.blocks([cid])
946    }
947
948    pub fn span<S: Borrow<Span>>(mut self, span: S) -> Self {
949        let span = span.borrow();
950        self.span = span.clone();
951        self
952    }
953
954    pub fn timeout(mut self, timeout: impl Into<Option<Duration>>) -> Self {
955        self.timeout = timeout.into();
956        self
957    }
958
959    pub fn local(mut self) -> Self {
960        self.local = true;
961        self
962    }
963
964    pub fn set_local(mut self, local: bool) -> Self {
965        self.local = local;
966        self
967    }
968
969    pub fn provider<C: Borrow<PeerId>>(self, peer_id: C) -> Self {
970        self.providers([peer_id])
971    }
972
973    pub fn providers(mut self, providers: impl IntoIterator<Item = impl Borrow<PeerId>>) -> Self {
974        self.providers
975            .extend(providers.into_iter().map(|k| *k.borrow()));
976        self
977    }
978}
979
980impl Stream for RepoGetBlocks {
981    type Item = Result<Block, Error>;
982
983    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
984        if self.stream.is_none() && self.repo.is_none() {
985            return Poll::Ready(None);
986        }
987
988        let this = &mut *self;
989
990        loop {
991            match &mut this.stream {
992                Some(stream) => {
993                    let _g = this.span.enter();
994                    match futures::ready!(stream.poll_next_unpin(cx)) {
995                        Some(item) => return Poll::Ready(Some(item)),
996                        None => {
997                            this.stream.take();
998                            return Poll::Ready(None);
999                        }
1000                    }
1001                }
1002                None => {
1003                    let repo = this.repo.take().expect("valid repo instance");
1004                    let providers = std::mem::take(&mut this.providers);
1005                    let cids = std::mem::take(&mut this.cids);
1006                    let local_only = this.local;
1007                    let timeout = this.timeout;
1008
1009                    let st = async_stream::stream! {
1010                        let _guard = repo.gc_guard().await;
1011                        let mut missing: IndexSet<Cid> = cids.clone();
1012                        for cid in &cids {
1013                            if let Ok(Some(block)) = repo.get_block_now(cid).await {
1014                                yield Ok(block);
1015                                missing.shift_remove(cid);
1016                            }
1017                        }
1018
1019                        if missing.is_empty() {
1020                            return;
1021                        }
1022
1023                        if local_only || !repo.is_online() {
1024                            yield Err(anyhow::anyhow!("Unable to locate missing blocks {missing:?}"));
1025                            return;
1026                        }
1027
1028                        let mut events = match repo.repo_channel() {
1029                            Some(events) => events,
1030                            None => {
1031                                yield Err(anyhow::anyhow!("Channel is not available"));
1032                                return;
1033                            }
1034                        };
1035
1036                        let mut notified: HashMap<Cid, Vec<_>> = HashMap::new();
1037
1038                        let timeout = timeout.or(Some(Duration::from_secs(60)));
1039
1040                        let mut blocks = FuturesOrdered::new();
1041
1042                        for cid in &missing {
1043                            let cid = *cid;
1044                            let (tx, rx) = futures::channel::oneshot::channel();
1045                            repo.inner
1046                                .subscriptions
1047                                .lock()
1048                                .entry(cid)
1049                                .or_default()
1050                                .push(tx);
1051
1052                            let mut events = events.clone();
1053                            let signal = Arc::new(Notify::new());
1054                            let s2 = signal.clone();
1055                            let task = async move {
1056                                let block_fut = rx;
1057                                let notified_fut = signal.notified();
1058                                futures::pin_mut!(notified_fut);
1059
1060                                match futures::future::select(block_fut, notified_fut).await {
1061                                    Either::Left((Ok(Ok(block)), _)) => Ok::<_, Error>(block),
1062                                    Either::Left((Ok(Err(e)), _)) => Err::<_, Error>(anyhow::anyhow!("{e}")),
1063                                    Either::Left((Err(e), _)) => Err::<_, Error>(e.into()),
1064                                    Either::Right(((), _)) => {
1065                                        Err::<_, Error>(anyhow::anyhow!("request for {cid} has been cancelled"))
1066                                    }
1067                                }
1068                            }
1069                            .map_err(move |e| {
1070                                // Although we request would eventually be cancelled if timeout or cancelled, we can still signal to swarm
1071                                // about the block being unwanted for future changes.
1072                                _ = events.try_send(RepoEvent::UnwantBlock(cid));
1073                                e
1074                            })
1075                            .boxed();
1076
1077                            notified.entry(cid).or_default().push(s2);
1078
1079                            blocks.push_back(task);
1080                        }
1081
1082                        events
1083                            .send(RepoEvent::WantBlock(
1084                                Vec::from_iter(missing),
1085                                Vec::from_iter(providers),
1086                                timeout,
1087                                Some(notified),
1088                            ))
1089                            .await
1090                            .ok();
1091
1092                        for await block in blocks {
1093                            yield block
1094                        }
1095                    };
1096
1097                    this.stream.replace(Box::pin(st));
1098                }
1099            }
1100        }
1101    }
1102}
1103
1104impl IntoFuture for RepoGetBlocks {
1105    type Output = Result<Vec<Block>, Error>;
1106    type IntoFuture = BoxFuture<'static, Self::Output>;
1107    fn into_future(self) -> Self::IntoFuture {
1108        async move {
1109            let col = self.try_collect().await?;
1110            Ok(col)
1111        }
1112        .boxed()
1113    }
1114}
1115
1116pub struct RepoPutBlock<'a> {
1117    repo: Repo,
1118    block: &'a Block,
1119    span: Option<Span>,
1120    broadcast_on_new_block: bool,
1121}
1122
1123impl<'a> RepoPutBlock<'a> {
1124    fn new(repo: &Repo, block: &'a Block) -> Self {
1125        Self {
1126            repo: repo.clone(),
1127            block,
1128            span: None,
1129            broadcast_on_new_block: true,
1130        }
1131    }
1132
1133    pub fn broadcast_on_new_block(mut self, v: bool) -> Self {
1134        self.broadcast_on_new_block = v;
1135        self
1136    }
1137
1138    pub fn span(mut self, span: Span) -> Self {
1139        self.span = Some(span);
1140        self
1141    }
1142}
1143
1144impl IntoFuture for RepoPutBlock<'_> {
1145    type IntoFuture = BoxFuture<'static, Self::Output>;
1146    type Output = Result<Cid, Error>;
1147    fn into_future(self) -> Self::IntoFuture {
1148        let block = self.block.clone();
1149        let span = self.span.unwrap_or(Span::current());
1150        let span = debug_span!(parent: &span, "put_block", cid = %block.cid());
1151        async move {
1152            let _guard = self.repo.inner.gclock.read().await;
1153            let (cid, res) = self.repo.inner.block_store.put(&block).await?;
1154
1155            if let BlockPut::NewBlock = res {
1156                if self.broadcast_on_new_block {
1157                    if let Some(mut event) = self.repo.repo_channel() {
1158                        _ = event.send(RepoEvent::NewBlock(block.clone())).await;
1159                    }
1160                }
1161                let list = self.repo.inner.subscriptions.lock().remove(&cid);
1162                if let Some(mut list) = list {
1163                    for ch in list.drain(..) {
1164                        let block = block.clone();
1165                        let _ = ch.send(Ok(block));
1166                    }
1167                }
1168            }
1169
1170            Ok(cid)
1171        }
1172        .instrument(span)
1173        .boxed()
1174    }
1175}
1176
1177pub struct RepoFetch {
1178    repo: Repo,
1179    cid: Cid,
1180    span: Option<Span>,
1181    providers: Vec<PeerId>,
1182    recursive: bool,
1183    timeout: Option<Duration>,
1184    refs: crate::refs::IpldRefs,
1185}
1186
1187impl RepoFetch {
1188    pub fn new<C: Borrow<Cid>>(repo: Repo, cid: C) -> Self {
1189        let cid = cid.borrow();
1190        Self {
1191            repo,
1192            cid: *cid,
1193            recursive: false,
1194            providers: vec![],
1195            timeout: None,
1196            refs: Default::default(),
1197            span: None,
1198        }
1199    }
1200
1201    /// Fetch blocks recursively
1202    pub fn recursive(mut self) -> Self {
1203        self.recursive = true;
1204        self
1205    }
1206
1207    /// Peer that may contain the block
1208    pub fn provider(mut self, peer_id: PeerId) -> Self {
1209        if !self.providers.contains(&peer_id) {
1210            self.providers.push(peer_id);
1211        }
1212        self
1213    }
1214
1215    /// List of peers that may contain the block
1216    pub fn providers(mut self, providers: &[PeerId]) -> Self {
1217        self.providers = providers.to_vec();
1218        self
1219    }
1220
1221    /// Fetch blocks to a specific depth
1222    pub fn depth(mut self, depth: u64) -> Self {
1223        self.refs = self.refs.with_max_depth(depth);
1224        self
1225    }
1226
1227    /// Duration to fetch the block from the network before
1228    /// timing out
1229    pub fn timeout(mut self, duration: Duration) -> Self {
1230        self.timeout.replace(duration);
1231        self.refs = self.refs.with_timeout(duration);
1232        self
1233    }
1234
1235    pub fn exit_on_error(mut self) -> Self {
1236        self.refs = self.refs.with_exit_on_error();
1237        self
1238    }
1239
1240    /// Set tracing span
1241    pub fn span(mut self, span: Span) -> Self {
1242        self.span = Some(span);
1243        self
1244    }
1245}
1246
1247impl IntoFuture for RepoFetch {
1248    type Output = Result<(), Error>;
1249
1250    type IntoFuture = BoxFuture<'static, Self::Output>;
1251
1252    fn into_future(self) -> Self::IntoFuture {
1253        let cid = self.cid;
1254        let span = self.span.unwrap_or(Span::current());
1255        let recursive = self.recursive;
1256        let repo = self.repo;
1257        let span = debug_span!(parent: &span, "fetch", cid = %cid, recursive);
1258        let providers = self.providers;
1259        let timeout = self.timeout;
1260        async move {
1261            // Although getting a block adds a guard, we will add a read guard here a head of time so we can hold it throughout this future
1262            let _g = repo.inner.gclock.read().await;
1263            let block = repo
1264                .get_block(cid)
1265                .providers(&providers)
1266                .timeout(timeout)
1267                .await?;
1268
1269            if !recursive {
1270                return Ok(());
1271            }
1272            let ipld = block.to_ipld()?;
1273
1274            let mut st = self
1275                .refs
1276                .with_only_unique()
1277                .providers(&providers)
1278                .refs_of_resolved(&repo, vec![(cid, ipld.clone())])
1279                .map_ok(|crate::refs::Edge { destination, .. }| destination)
1280                .into_stream()
1281                .boxed();
1282
1283            while let Some(_c) = st.try_next().await? {}
1284
1285            Ok(())
1286        }
1287        .instrument(span)
1288        .boxed()
1289    }
1290}
1291
1292pub struct RepoInsertPin {
1293    repo: Repo,
1294    cid: Cid,
1295    span: Option<Span>,
1296    providers: Vec<PeerId>,
1297    recursive: bool,
1298    timeout: Option<Duration>,
1299    local: bool,
1300    refs: crate::refs::IpldRefs,
1301}
1302
1303impl RepoInsertPin {
1304    pub fn new<C: Borrow<Cid>>(repo: Repo, cid: C) -> Self {
1305        let cid = cid.borrow();
1306        Self {
1307            repo,
1308            cid: *cid,
1309            recursive: false,
1310            providers: vec![],
1311            local: false,
1312            timeout: None,
1313            refs: Default::default(),
1314            span: None,
1315        }
1316    }
1317
1318    /// Recursively pin blocks
1319    pub fn recursive(mut self) -> Self {
1320        self.recursive = true;
1321        self
1322    }
1323
1324    /// Pin local blocks only
1325    pub fn local(mut self) -> Self {
1326        self.local = true;
1327        self.refs = self.refs.with_existing_blocks();
1328        self
1329    }
1330
1331    /// Set a flag to pin local blocks only
1332    pub fn set_local(mut self, local: bool) -> Self {
1333        self.local = local;
1334        if local {
1335            self.refs = self.refs.with_existing_blocks();
1336        }
1337        self
1338    }
1339
1340    /// Peer that may contain the block to pin
1341    pub fn provider(mut self, peer_id: PeerId) -> Self {
1342        if !self.providers.contains(&peer_id) {
1343            self.providers.push(peer_id);
1344        }
1345        self
1346    }
1347
1348    /// List of peers that may contain the block to pin
1349    pub fn providers(mut self, providers: &[PeerId]) -> Self {
1350        self.providers = providers.into();
1351        self
1352    }
1353
1354    /// Pin to a specific depth of the graph
1355    pub fn depth(mut self, depth: u64) -> Self {
1356        self.refs = self.refs.with_max_depth(depth);
1357        self
1358    }
1359
1360    /// Duration to fetch the block from the network before
1361    /// timing out
1362    pub fn timeout(mut self, duration: Duration) -> Self {
1363        self.timeout.replace(duration);
1364        self.refs = self.refs.with_timeout(duration);
1365        self
1366    }
1367
1368    pub fn exit_on_error(mut self) -> Self {
1369        self.refs = self.refs.with_exit_on_error();
1370        self
1371    }
1372
1373    /// Set tracing span
1374    pub fn span(mut self, span: Span) -> Self {
1375        self.span = Some(span);
1376        self
1377    }
1378}
1379
1380impl IntoFuture for RepoInsertPin {
1381    type Output = Result<(), Error>;
1382
1383    type IntoFuture = BoxFuture<'static, Self::Output>;
1384
1385    fn into_future(self) -> Self::IntoFuture {
1386        let cid = self.cid;
1387        let local = self.local;
1388        let span = self.span.unwrap_or(Span::current());
1389        let recursive = self.recursive;
1390        let repo = self.repo;
1391        let span = debug_span!(parent: &span, "insert_pin", cid = %cid, recursive);
1392        let providers = self.providers;
1393        let timeout = self.timeout;
1394        async move {
1395            // Although getting a block adds a guard, we will add a read guard here a head of time so we can hold it throughout this future
1396            let _g = repo.inner.gclock.read().await;
1397            let block = repo
1398                .get_block(cid)
1399                .providers(&providers)
1400                .set_local(local)
1401                .timeout(timeout)
1402                .await?;
1403
1404            if !recursive {
1405                repo.insert_direct_pin(&cid).await?
1406            } else {
1407                let ipld = block.to_ipld()?;
1408
1409                let st = self
1410                    .refs
1411                    .with_only_unique()
1412                    .providers(&providers)
1413                    .refs_of_resolved(&repo, vec![(cid, ipld.clone())])
1414                    .map_ok(|crate::refs::Edge { destination, .. }| destination)
1415                    .into_stream()
1416                    .boxed();
1417
1418                repo.insert_recursive_pin(&cid, st).await?
1419            }
1420            Ok(())
1421        }
1422        .instrument(span)
1423        .boxed()
1424    }
1425}
1426
1427pub struct RepoRemovePin {
1428    repo: Repo,
1429    cid: Cid,
1430    span: Option<Span>,
1431    recursive: bool,
1432    refs: crate::refs::IpldRefs,
1433}
1434
1435impl RepoRemovePin {
1436    pub fn new<C: Borrow<Cid>>(repo: Repo, cid: C) -> Self {
1437        let cid = cid.borrow();
1438        Self {
1439            repo,
1440            cid: *cid,
1441            recursive: false,
1442            refs: Default::default(),
1443            span: None,
1444        }
1445    }
1446
1447    /// Recursively unpin blocks
1448    pub fn recursive(mut self) -> Self {
1449        self.recursive = true;
1450        self
1451    }
1452
1453    /// Set tracing span
1454    pub fn span(mut self, span: Span) -> Self {
1455        self.span = Some(span);
1456        self
1457    }
1458}
1459
1460impl IntoFuture for RepoRemovePin {
1461    type Output = Result<(), Error>;
1462
1463    type IntoFuture = BoxFuture<'static, Self::Output>;
1464
1465    fn into_future(self) -> Self::IntoFuture {
1466        let cid = self.cid;
1467        let span = self.span.unwrap_or(Span::current());
1468        let recursive = self.recursive;
1469        let repo = self.repo;
1470
1471        let span = debug_span!(parent: &span, "remove_pin", cid = %cid, recursive);
1472        async move {
1473            let _g = repo.inner.gclock.read().await;
1474            if !recursive {
1475                repo.remove_direct_pin(&cid).await
1476            } else {
1477                // start walking refs of the root after loading it
1478
1479                let block = match repo.get_block_now(&cid).await? {
1480                    Some(b) => b,
1481                    None => {
1482                        return Err(anyhow::anyhow!("pinned root not found: {}", cid));
1483                    }
1484                };
1485
1486                let ipld = block.to_ipld()?;
1487                let st = self
1488                    .refs
1489                    .with_only_unique()
1490                    .with_existing_blocks()
1491                    .refs_of_resolved(&repo, vec![(cid, ipld)])
1492                    .map_ok(|crate::refs::Edge { destination, .. }| destination)
1493                    .into_stream()
1494                    .boxed();
1495
1496                repo.remove_recursive_pin(&cid, st).await
1497            }
1498        }
1499        .instrument(span)
1500        .boxed()
1501    }
1502}