1use crate::error::Error;
3use crate::{Block, StorageType};
4use async_trait::async_trait;
5use core::fmt::Debug;
6use futures::channel::mpsc::{channel, Receiver, Sender};
7use futures::future::{BoxFuture, Either};
8use futures::sink::SinkExt;
9use futures::stream::{self, BoxStream, FuturesOrdered};
10use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
11use indexmap::IndexSet;
12use ipld_core::cid::Cid;
13use libp2p::identity::PeerId;
14use parking_lot::{Mutex, RwLock};
15use std::borrow::Borrow;
16use std::collections::{BTreeSet, HashMap};
17use std::future::{Future, IntoFuture};
18#[allow(unused_imports)]
19use std::path::Path;
20use std::pin::Pin;
21use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22use std::sync::Arc;
23use std::task::{Context, Poll};
24use std::time::Duration;
25use std::{error, fmt, io};
26use tokio::sync::{Notify, RwLockReadGuard};
27use tracing::{log, Instrument, Span};
28
29#[macro_use]
30#[cfg(test)]
31mod common_tests;
32
33pub mod blockstore;
34pub mod datastore;
35pub mod lock;
36
37#[cfg(not(target_arch = "wasm32"))]
39pub(crate) mod paths;
40
41#[derive(Debug, PartialEq, Eq)]
43pub enum BlockPut {
44 NewBlock,
46 Existed,
48}
49
50#[derive(Debug)]
52pub enum BlockRm {
53 Removed(Cid),
55 }
57
58#[derive(Debug)]
61pub enum BlockRmError {
62 NotFound(Cid),
65}
66
67#[async_trait]
69pub trait BlockStore: Debug + Send + Sync {
70 async fn init(&self) -> Result<(), Error>;
71
72 #[deprecated]
73 async fn open(&self) -> Result<(), Error> {
74 Ok(())
75 }
76 async fn contains(&self, cid: &Cid) -> Result<bool, Error>;
78 async fn get(&self, cid: &Cid) -> Result<Option<Block>, Error>;
80 async fn size(&self, cid: &[Cid]) -> Result<Option<usize>, Error>;
82 async fn total_size(&self) -> Result<usize, Error>;
84 async fn put(&self, block: &Block) -> Result<(Cid, BlockPut), Error>;
86 async fn remove(&self, cid: &Cid) -> Result<(), Error>;
88 async fn remove_many(&self, blocks: BoxStream<'static, Cid>) -> BoxStream<'static, Cid>;
90 async fn list(&self) -> BoxStream<'static, Cid>;
92}
93
94#[async_trait]
95pub trait DataStore: PinStore + Debug + Send + Sync {
97 async fn init(&self) -> Result<(), Error>;
98 #[deprecated]
99 async fn open(&self) -> Result<(), Error> {
100 Ok(())
101 }
102 async fn contains(&self, key: &[u8]) -> Result<bool, Error>;
104 async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error>;
106 async fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error>;
108 async fn remove(&self, key: &[u8]) -> Result<(), Error>;
110 async fn iter(&self) -> futures::stream::BoxStream<'static, (Vec<u8>, Vec<u8>)>;
112}
113
114#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
115pub struct GCConfig {
116 pub duration: Duration,
119
120 pub trigger: GCTrigger,
122}
123
124impl Default for GCConfig {
125 fn default() -> Self {
126 Self {
127 duration: Duration::from_secs(60 * 60),
128 trigger: GCTrigger::default(),
129 }
130 }
131}
132
133#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord)]
134pub enum GCTrigger {
135 At {
137 size: usize,
138 },
139
140 AtStorage,
141
142 #[default]
144 None,
145}
146
147#[derive(Debug)]
149pub enum LockError {
150 RepoInUse,
151 LockFileOpenFailed(io::Error),
152}
153
154impl fmt::Display for LockError {
155 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
156 let msg = match self {
157 LockError::RepoInUse => "The repository is already being used by an IPFS instance.",
158 LockError::LockFileOpenFailed(_) => "Failed to open repository lock file.",
159 };
160
161 write!(f, "{msg}")
162 }
163}
164
165impl From<io::Error> for LockError {
166 fn from(error: io::Error) -> Self {
167 match error.kind() {
168 io::ErrorKind::WouldBlock => LockError::RepoInUse,
171 _ => LockError::LockFileOpenFailed(error),
172 }
173 }
174}
175
176impl error::Error for LockError {
177 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
178 if let Self::LockFileOpenFailed(error) = self {
179 Some(error)
180 } else {
181 None
182 }
183 }
184}
185
186pub trait Lock: Debug + Send + Sync {
191 fn try_exclusive(&self) -> Result<(), LockError>;
193}
194
195type References<'a> = BoxStream<'a, Result<Cid, crate::refs::IpldRefsError>>;
196
197#[async_trait]
198pub trait PinStore: Debug + Send + Sync {
199 async fn is_pinned(&self, block: &Cid) -> Result<bool, Error>;
200
201 async fn insert_direct_pin(&self, target: &Cid) -> Result<(), Error>;
202
203 async fn insert_recursive_pin(
204 &self,
205 target: &Cid,
206 referenced: References<'_>,
207 ) -> Result<(), Error>;
208
209 async fn remove_direct_pin(&self, target: &Cid) -> Result<(), Error>;
210
211 async fn remove_recursive_pin(
212 &self,
213 target: &Cid,
214 referenced: References<'_>,
215 ) -> Result<(), Error>;
216
217 async fn list(
218 &self,
219 mode: Option<PinMode>,
220 ) -> futures::stream::BoxStream<'static, Result<(Cid, PinMode), Error>>;
221
222 async fn query(
230 &self,
231 ids: Vec<Cid>,
232 requirement: Option<PinMode>,
233 ) -> Result<Vec<(Cid, PinKind<Cid>)>, Error>;
234}
235
236#[derive(Debug, PartialEq, Eq, Clone, Copy)]
238pub enum PinMode {
239 Indirect,
240 Direct,
241 Recursive,
242}
243
244#[derive(Debug, Clone, Copy)]
247enum PinModeRequirement {
248 Only(PinMode),
249 Any,
250}
251
252impl From<Option<PinMode>> for PinModeRequirement {
253 fn from(filter: Option<PinMode>) -> Self {
254 match filter {
255 Some(one) => PinModeRequirement::Only(one),
256 None => PinModeRequirement::Any,
257 }
258 }
259}
260
261#[allow(dead_code)]
262impl PinModeRequirement {
263 fn is_indirect_or_any(&self) -> bool {
264 use PinModeRequirement::*;
265 match self {
266 Only(PinMode::Indirect) | Any => true,
267 Only(_) => false,
268 }
269 }
270
271 fn matches<P: PartialEq<PinMode>>(&self, other: &P) -> bool {
272 use PinModeRequirement::*;
273 match self {
274 Only(one) if other == one => true,
275 Only(_) => false,
276 Any => true,
277 }
278 }
279
280 fn required(&self) -> Option<PinMode> {
281 use PinModeRequirement::*;
282 match self {
283 Only(one) => Some(*one),
284 Any => None,
285 }
286 }
287}
288
289impl<B: Borrow<Cid>> PartialEq<PinMode> for PinKind<B> {
290 fn eq(&self, other: &PinMode) -> bool {
291 matches!(
292 (self, other),
293 (PinKind::IndirectFrom(_), PinMode::Indirect)
294 | (PinKind::Direct, PinMode::Direct)
295 | (PinKind::Recursive(_), PinMode::Recursive)
296 | (PinKind::RecursiveIntention, PinMode::Recursive)
297 )
298 }
299}
300
301#[derive(Debug, PartialEq, Eq)]
305pub enum PinKind<C: Borrow<Cid>> {
306 IndirectFrom(C),
307 Direct,
308 Recursive(u64),
309 RecursiveIntention,
310}
311
312impl<C: Borrow<Cid>> PinKind<C> {
313 fn as_ref(&self) -> PinKind<&'_ Cid> {
314 match self {
315 PinKind::IndirectFrom(c) => PinKind::IndirectFrom(c.borrow()),
316 PinKind::Direct => PinKind::Direct,
317 PinKind::Recursive(count) => PinKind::Recursive(*count),
318 PinKind::RecursiveIntention => PinKind::RecursiveIntention,
319 }
320 }
321}
322
323type SubscriptionsMap = HashMap<Cid, Vec<futures::channel::oneshot::Sender<Result<Block, String>>>>;
324
325#[allow(clippy::type_complexity)]
328#[derive(Debug, Clone)]
329pub struct Repo {
330 pub(crate) inner: Arc<RepoInner>,
331}
332
333#[derive(Debug)]
334pub(crate) struct RepoInner {
335 online: AtomicBool,
336 initialized: AtomicBool,
337 max_storage_size: AtomicUsize,
338 block_store: Box<dyn BlockStore>,
339 data_store: Box<dyn DataStore>,
340 events: RwLock<Option<Sender<RepoEvent>>>,
341 pub(crate) subscriptions: Mutex<SubscriptionsMap>,
342 lockfile: Box<dyn Lock>,
343 pub(crate) gclock: tokio::sync::RwLock<()>,
344}
345
346#[derive(Debug)]
348pub enum RepoEvent {
349 WantBlock(
351 Vec<Cid>,
352 Vec<PeerId>,
353 Option<Duration>,
354 Option<HashMap<Cid, Vec<Arc<Notify>>>>,
355 ),
356 UnwantBlock(Cid),
358 NewBlock(Block),
360 RemovedBlock(Cid),
362}
363
364impl Repo {
365 pub fn new(repo_type: &mut StorageType) -> Self {
366 match repo_type {
367 StorageType::Memory => Repo::new_memory(),
368 #[cfg(not(target_arch = "wasm32"))]
369 StorageType::Disk(path) => Repo::new_fs(path),
370 #[cfg(target_arch = "wasm32")]
371 StorageType::IndexedDb { namespace } => Repo::new_idb(namespace.take()),
372 StorageType::Custom {
373 blockstore,
374 datastore,
375 lock,
376 } => Repo::new_raw(
377 blockstore.take().expect("Requires blockstore"),
378 datastore.take().expect("Requires datastore"),
379 lock.take()
380 .expect("Requires lockfile for data and block store"),
381 ),
382 }
383 }
384
385 pub fn new_raw(
386 block_store: Box<dyn BlockStore>,
387 data_store: Box<dyn DataStore>,
388 lockfile: Box<dyn Lock>,
389 ) -> Self {
390 let inner = RepoInner {
391 initialized: AtomicBool::default(),
392 online: AtomicBool::default(),
393 block_store,
394 data_store,
395 events: Default::default(),
396 subscriptions: Default::default(),
397 lockfile,
398 max_storage_size: Default::default(),
399 gclock: Default::default(),
400 };
401 Repo {
402 inner: Arc::new(inner),
403 }
404 }
405
406 #[cfg(not(target_arch = "wasm32"))]
407 pub fn new_fs(path: impl AsRef<Path>) -> Self {
408 let path = path.as_ref().to_path_buf();
409 let mut blockstore_path = path.clone();
410 let mut datastore_path = path.clone();
411 let mut lockfile_path = path;
412 blockstore_path.push("blockstore");
413 datastore_path.push("datastore");
414 lockfile_path.push("repo_lock");
415
416 let block_store = Box::new(blockstore::flatfs::FsBlockStore::new(blockstore_path));
417 let data_store = Box::new(datastore::flatfs::FsDataStore::new(datastore_path));
418 let lockfile = Box::new(lock::FsLock::new(lockfile_path));
419 Self::new_raw(block_store, data_store, lockfile)
420 }
421
422 pub fn new_memory() -> Self {
423 let block_store = Box::new(blockstore::memory::MemBlockStore::new(Default::default()));
424 let data_store = Box::new(datastore::memory::MemDataStore::new(Default::default()));
425 let lockfile = Box::new(lock::MemLock);
426 Self::new_raw(block_store, data_store, lockfile)
427 }
428
429 #[cfg(target_arch = "wasm32")]
430 pub fn new_idb(namespace: Option<String>) -> Self {
431 let block_store = Box::new(blockstore::idb::IdbBlockStore::new(namespace.clone()));
432 let data_store = Box::new(datastore::idb::IdbDataStore::new(namespace));
433 let lockfile = Box::new(lock::MemLock);
434 Self::new_raw(block_store, data_store, lockfile)
435 }
436
437 pub fn set_max_storage_size(&self, size: usize) {
438 self.inner.max_storage_size.store(size, Ordering::SeqCst);
439 }
440
441 pub fn max_storage_size(&self) -> usize {
442 self.inner.max_storage_size.load(Ordering::SeqCst)
443 }
444
445 pub async fn migrate(&self, repo: &Self) -> Result<(), Error> {
446 if self.is_online() || repo.is_online() {
447 anyhow::bail!("Repository cannot be online");
448 }
449 let block_migration = {
450 async move {
451 let mut stream = self.list_blocks().await;
452 while let Some(cid) = stream.next().await {
453 match self.get_block_now(&cid).await {
454 Ok(Some(block)) => match repo.inner.block_store.put(&block).await {
455 Ok(_) => {}
456 Err(e) => error!("Error migrating {cid}: {e}"),
457 },
458 Ok(None) => error!("{cid} doesnt exist"),
459 Err(e) => error!("Error getting block {cid}: {e}"),
460 }
461 }
462 }
463 };
464
465 let data_migration = {
466 async move {
467 let mut data_stream = self.data_store().iter().await;
468 while let Some((k, v)) = data_stream.next().await {
469 if let Err(e) = repo.data_store().put(&k, &v).await {
470 error!("Unable to migrate {k:?} into repo: {e}");
471 }
472 }
473 }
474 };
475
476 let pins_migration = {
477 async move {
478 let mut stream = self.data_store().list(None).await;
479 while let Some(Ok((cid, pin_mode))) = stream.next().await {
480 match pin_mode {
481 PinMode::Direct => match repo.data_store().insert_direct_pin(&cid).await {
482 Ok(_) => {}
483 Err(e) => error!("Unable to migrate pin {cid}: {e}"),
484 },
485 PinMode::Indirect => {
486 continue;
488 }
489 PinMode::Recursive => {
490 let block = match self
491 .get_block_now(&cid)
492 .await
493 .map(|block| block.and_then(|block| block.to_ipld().ok()))
494 {
495 Ok(Some(block)) => block,
496 Ok(None) => continue,
497 Err(e) => {
498 error!("Block {cid} does not exist but is pinned: {e}");
499 continue;
500 }
501 };
502
503 let st = crate::refs::IpldRefs::default()
504 .with_only_unique()
505 .refs_of_resolved(self, vec![(cid, block.clone())].into_iter())
506 .map_ok(|crate::refs::Edge { destination, .. }| destination)
507 .into_stream()
508 .boxed();
509
510 if let Err(e) = repo.insert_recursive_pin(&cid, st).await {
511 error!("Error migrating pin {cid}: {e}");
512 continue;
513 }
514 }
515 }
516 }
517 }
518 };
519
520 futures::join!(block_migration, data_migration, pins_migration);
521
522 Ok(())
523 }
524
525 pub(crate) fn initialize_channel(&self) -> Receiver<RepoEvent> {
526 let mut event_guard = self.inner.events.write();
527 let (sender, receiver) = channel(1);
528 debug_assert!(event_guard.is_none());
529 *event_guard = Some(sender);
530 self.set_online();
531 receiver
532 }
533
534 pub fn shutdown(&self) {
537 let mut map = self.inner.subscriptions.lock();
538 map.clear();
539 drop(map);
540 if let Some(mut event) = self.inner.events.write().take() {
541 event.close_channel()
542 }
543 self.set_offline();
544 }
545
546 pub fn is_online(&self) -> bool {
547 self.inner.online.load(Ordering::SeqCst)
548 }
549
550 pub(crate) fn set_online(&self) {
551 if self.is_online() {
552 return;
553 }
554
555 self.inner.online.store(true, Ordering::SeqCst)
556 }
557
558 pub(crate) fn set_offline(&self) {
559 if !self.is_online() {
560 return;
561 }
562
563 self.inner.online.store(false, Ordering::SeqCst)
564 }
565
566 fn repo_channel(&self) -> Option<Sender<RepoEvent>> {
567 self.inner.events.read().clone()
568 }
569
570 pub async fn init(&self) -> Result<(), Error> {
571 if self.inner.initialized.load(Ordering::SeqCst) {
573 return Ok(());
574 }
575 {
578 log::debug!("Trying lockfile");
579 self.inner.lockfile.try_exclusive()?;
580 log::debug!("lockfile tried");
581 }
582
583 self.inner.block_store.init().await?;
584 self.inner.data_store.init().await?;
585 self.inner.initialized.store(true, Ordering::SeqCst);
586 Ok(())
587 }
588
589 pub fn put_block<'a>(&self, block: &'a Block) -> RepoPutBlock<'a> {
591 RepoPutBlock::new(self, block).broadcast_on_new_block(true)
592 }
593
594 #[inline]
597 pub fn get_block<C: Borrow<Cid>>(&self, cid: C) -> RepoGetBlock {
598 RepoGetBlock::new(self.clone(), cid)
599 }
600
601 #[inline]
604 pub fn get_blocks(&self, cids: impl IntoIterator<Item = impl Borrow<Cid>>) -> RepoGetBlocks {
605 RepoGetBlocks::new(self.clone()).blocks(cids)
606 }
607
608 #[inline]
610 pub async fn get_blocks_size(&self, cids: &[Cid]) -> Result<Option<usize>, Error> {
611 self.inner.block_store.size(cids).await
612 }
613
614 #[inline]
616 pub async fn get_total_size(&self) -> Result<usize, Error> {
617 self.inner.block_store.total_size().await
618 }
619
620 pub async fn get_block_now<C: Borrow<Cid>>(&self, cid: C) -> Result<Option<Block>, Error> {
622 let cid = cid.borrow();
623 self.inner.block_store.get(cid).await
624 }
625
626 pub async fn contains<C: Borrow<Cid>>(&self, cid: C) -> Result<bool, Error> {
628 let cid = cid.borrow();
629 self.inner.block_store.contains(cid).await
630 }
631
632 pub async fn list_blocks(&self) -> BoxStream<'static, Cid> {
634 self.inner.block_store.list().await
635 }
636
637 pub async fn remove_block<C: Borrow<Cid>>(
639 &self,
640 cid: C,
641 recursive: bool,
642 ) -> Result<Vec<Cid>, Error> {
643 let _guard = self.inner.gclock.read().await;
644 let cid = cid.borrow();
645 if self.is_pinned(cid).await? {
646 return Err(anyhow::anyhow!("block to remove is pinned"));
647 }
648
649 let list = match recursive {
650 true => {
651 let mut list = self.recursive_collections(*cid).await;
652 list.insert(*cid);
654 list
655 }
656 false => BTreeSet::from_iter(std::iter::once(*cid)),
657 };
658
659 let list = stream::iter(
660 FuturesOrdered::from_iter(list.into_iter().map(|cid| async move { cid }))
661 .filter_map(|cid| async move {
662 (!self.is_pinned(&cid).await.unwrap_or_default()).then_some(cid)
663 })
664 .collect::<Vec<Cid>>()
665 .await,
666 )
667 .boxed();
668
669 let removed = self
670 .inner
671 .block_store
672 .remove_many(list)
673 .await
674 .collect::<Vec<_>>()
675 .await;
676
677 for cid in &removed {
678 if let Some(mut events) = self.repo_channel() {
680 let _ = events.send(RepoEvent::RemovedBlock(*cid)).await;
681 }
682 }
683 Ok(removed)
684 }
685
686 fn recursive_collections(&self, cid: Cid) -> BoxFuture<'_, BTreeSet<Cid>> {
687 async move {
688 let block = match self.get_block_now(&cid).await {
689 Ok(Some(block)) => block,
690 _ => return BTreeSet::default(),
691 };
692
693 let mut references: BTreeSet<Cid> = BTreeSet::new();
694 if block.references(&mut references).is_err() {
695 return BTreeSet::default();
696 }
697
698 let mut list = BTreeSet::new();
699
700 for cid in &references {
701 let mut inner_list = self.recursive_collections(*cid).await;
702 list.append(&mut inner_list);
703 }
704
705 references.append(&mut list);
706 references
707 }
708 .boxed()
709 }
710
711 pub fn pin<C: Borrow<Cid>>(&self, cid: C) -> RepoInsertPin {
724 RepoInsertPin::new(self.clone(), cid)
725 }
726
727 pub fn remove_pin<C: Borrow<Cid>>(&self, cid: C) -> RepoRemovePin {
734 RepoRemovePin::new(self.clone(), cid)
735 }
736
737 pub fn fetch<C: Borrow<Cid>>(&self, cid: C) -> RepoFetch {
738 RepoFetch::new(self.clone(), cid)
739 }
740
741 pub(crate) async fn insert_pin(
743 &self,
744 cid: &Cid,
745 recursive: bool,
746 local_only: bool,
747 ) -> Result<(), Error> {
748 let mut pin_fut = self.pin(cid);
749 if recursive {
750 pin_fut = pin_fut.recursive();
751 }
752 if local_only {
753 pin_fut = pin_fut.local();
754 }
755 pin_fut.await
756 }
757
758 pub(crate) async fn insert_direct_pin(&self, cid: &Cid) -> Result<(), Error> {
760 self.inner.data_store.insert_direct_pin(cid).await
761 }
762
763 pub(crate) async fn insert_recursive_pin(
765 &self,
766 cid: &Cid,
767 refs: References<'_>,
768 ) -> Result<(), Error> {
769 self.inner.data_store.insert_recursive_pin(cid, refs).await
770 }
771
772 pub(crate) async fn remove_direct_pin(&self, cid: &Cid) -> Result<(), Error> {
774 self.inner.data_store.remove_direct_pin(cid).await
775 }
776
777 pub(crate) async fn remove_recursive_pin(
779 &self,
780 cid: &Cid,
781 refs: References<'_>,
782 ) -> Result<(), Error> {
783 self.inner.data_store.remove_recursive_pin(cid, refs).await
785 }
786
787 pub(crate) async fn cleanup(&self) -> Result<Vec<Cid>, Error> {
789 let repo = self.clone();
790
791 let blocks = repo.list_blocks().await;
792 let pins = repo
793 .list_pins(None)
794 .await
795 .filter_map(|result| futures::future::ready(result.map(|(cid, _)| cid).ok()))
796 .collect::<Vec<_>>()
797 .await;
798
799 let stream = async_stream::stream! {
800 for await cid in blocks {
801 if pins.contains(&cid) {
802 continue;
803 }
804 yield cid;
805 }
806 }
807 .boxed();
808
809 let removed_blocks = self
810 .inner
811 .block_store
812 .remove_many(stream)
813 .await
814 .collect::<Vec<_>>()
815 .await;
816
817 Ok(removed_blocks)
818 }
819
820 pub async fn is_pinned<C: Borrow<Cid>>(&self, cid: C) -> Result<bool, Error> {
822 let cid = cid.borrow();
823 self.inner.data_store.is_pinned(cid).await
824 }
825
826 pub async fn list_pins(
827 &self,
828 mode: impl Into<Option<PinMode>>,
829 ) -> BoxStream<'static, Result<(Cid, PinMode), Error>> {
830 let mode = mode.into();
831 self.inner.data_store.list(mode).await
832 }
833
834 pub async fn query_pins(
835 &self,
836 cids: Vec<Cid>,
837 requirement: impl Into<Option<PinMode>>,
838 ) -> Result<Vec<(Cid, PinKind<Cid>)>, Error> {
839 let requirement = requirement.into();
840 self.inner.data_store.query(cids, requirement).await
841 }
842}
843
844pub struct GCGuard<'a> {
845 _g: RwLockReadGuard<'a, ()>,
846}
847
848impl Repo {
849 pub async fn gc_guard(&self) -> GCGuard {
853 let _g = self.inner.gclock.read().await;
854 GCGuard { _g }
855 }
856
857 pub fn data_store(&self) -> &dyn DataStore {
858 &*self.inner.data_store
859 }
860}
861
862pub struct RepoGetBlock {
863 instance: RepoGetBlocks,
864}
865
866impl RepoGetBlock {
867 pub fn new<C: Borrow<Cid>>(repo: Repo, cid: C) -> Self {
868 let instance = RepoGetBlocks::new(repo).block(cid);
869 Self { instance }
870 }
871
872 pub fn span<S: Borrow<Span>>(mut self, span: S) -> Self {
873 self.instance = self.instance.span(span);
874 self
875 }
876
877 pub fn timeout(mut self, timeout: impl Into<Option<Duration>>) -> Self {
878 self.instance = self.instance.timeout(timeout);
879 self
880 }
881
882 pub fn local(mut self) -> Self {
883 self.instance = self.instance.local();
884 self
885 }
886
887 pub fn set_local(mut self, local: bool) -> Self {
888 self.instance = self.instance.set_local(local);
889 self
890 }
891
892 pub fn provider(mut self, peer_id: PeerId) -> Self {
894 self.instance = self.instance.provider(peer_id);
895 self
896 }
897
898 pub fn providers(mut self, providers: impl IntoIterator<Item = impl Borrow<PeerId>>) -> Self {
900 self.instance = self.instance.providers(providers);
901 self
902 }
903}
904
905impl Future for RepoGetBlock {
906 type Output = Result<Block, Error>;
907 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
908 let this = &mut self;
909 match futures::ready!(this.instance.poll_next_unpin(cx)) {
910 Some(result) => Poll::Ready(result),
911 None => Poll::Ready(Err(anyhow::anyhow!("block does not exist"))),
912 }
913 }
914}
915
916pub struct RepoGetBlocks {
917 repo: Option<Repo>,
918 cids: IndexSet<Cid>,
919 providers: IndexSet<PeerId>,
920 local: bool,
921 span: Span,
922 timeout: Option<Duration>,
923 stream: Option<BoxStream<'static, Result<Block, Error>>>,
924}
925
926impl RepoGetBlocks {
927 pub fn new(repo: Repo) -> Self {
928 Self {
929 repo: Some(repo),
930 cids: IndexSet::new(),
931 providers: IndexSet::new(),
932 local: false,
933 span: Span::current(),
934 timeout: None,
935 stream: None,
936 }
937 }
938
939 pub fn blocks(mut self, cids: impl IntoIterator<Item = impl Borrow<Cid>>) -> Self {
940 self.cids.extend(cids.into_iter().map(|cid| *cid.borrow()));
941 self
942 }
943
944 pub fn block<C: Borrow<Cid>>(self, cid: C) -> Self {
945 self.blocks([cid])
946 }
947
948 pub fn span<S: Borrow<Span>>(mut self, span: S) -> Self {
949 let span = span.borrow();
950 self.span = span.clone();
951 self
952 }
953
954 pub fn timeout(mut self, timeout: impl Into<Option<Duration>>) -> Self {
955 self.timeout = timeout.into();
956 self
957 }
958
959 pub fn local(mut self) -> Self {
960 self.local = true;
961 self
962 }
963
964 pub fn set_local(mut self, local: bool) -> Self {
965 self.local = local;
966 self
967 }
968
969 pub fn provider<C: Borrow<PeerId>>(self, peer_id: C) -> Self {
970 self.providers([peer_id])
971 }
972
973 pub fn providers(mut self, providers: impl IntoIterator<Item = impl Borrow<PeerId>>) -> Self {
974 self.providers
975 .extend(providers.into_iter().map(|k| *k.borrow()));
976 self
977 }
978}
979
980impl Stream for RepoGetBlocks {
981 type Item = Result<Block, Error>;
982
983 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
984 if self.stream.is_none() && self.repo.is_none() {
985 return Poll::Ready(None);
986 }
987
988 let this = &mut *self;
989
990 loop {
991 match &mut this.stream {
992 Some(stream) => {
993 let _g = this.span.enter();
994 match futures::ready!(stream.poll_next_unpin(cx)) {
995 Some(item) => return Poll::Ready(Some(item)),
996 None => {
997 this.stream.take();
998 return Poll::Ready(None);
999 }
1000 }
1001 }
1002 None => {
1003 let repo = this.repo.take().expect("valid repo instance");
1004 let providers = std::mem::take(&mut this.providers);
1005 let cids = std::mem::take(&mut this.cids);
1006 let local_only = this.local;
1007 let timeout = this.timeout;
1008
1009 let st = async_stream::stream! {
1010 let _guard = repo.gc_guard().await;
1011 let mut missing: IndexSet<Cid> = cids.clone();
1012 for cid in &cids {
1013 if let Ok(Some(block)) = repo.get_block_now(cid).await {
1014 yield Ok(block);
1015 missing.shift_remove(cid);
1016 }
1017 }
1018
1019 if missing.is_empty() {
1020 return;
1021 }
1022
1023 if local_only || !repo.is_online() {
1024 yield Err(anyhow::anyhow!("Unable to locate missing blocks {missing:?}"));
1025 return;
1026 }
1027
1028 let mut events = match repo.repo_channel() {
1029 Some(events) => events,
1030 None => {
1031 yield Err(anyhow::anyhow!("Channel is not available"));
1032 return;
1033 }
1034 };
1035
1036 let mut notified: HashMap<Cid, Vec<_>> = HashMap::new();
1037
1038 let timeout = timeout.or(Some(Duration::from_secs(60)));
1039
1040 let mut blocks = FuturesOrdered::new();
1041
1042 for cid in &missing {
1043 let cid = *cid;
1044 let (tx, rx) = futures::channel::oneshot::channel();
1045 repo.inner
1046 .subscriptions
1047 .lock()
1048 .entry(cid)
1049 .or_default()
1050 .push(tx);
1051
1052 let mut events = events.clone();
1053 let signal = Arc::new(Notify::new());
1054 let s2 = signal.clone();
1055 let task = async move {
1056 let block_fut = rx;
1057 let notified_fut = signal.notified();
1058 futures::pin_mut!(notified_fut);
1059
1060 match futures::future::select(block_fut, notified_fut).await {
1061 Either::Left((Ok(Ok(block)), _)) => Ok::<_, Error>(block),
1062 Either::Left((Ok(Err(e)), _)) => Err::<_, Error>(anyhow::anyhow!("{e}")),
1063 Either::Left((Err(e), _)) => Err::<_, Error>(e.into()),
1064 Either::Right(((), _)) => {
1065 Err::<_, Error>(anyhow::anyhow!("request for {cid} has been cancelled"))
1066 }
1067 }
1068 }
1069 .map_err(move |e| {
1070 _ = events.try_send(RepoEvent::UnwantBlock(cid));
1073 e
1074 })
1075 .boxed();
1076
1077 notified.entry(cid).or_default().push(s2);
1078
1079 blocks.push_back(task);
1080 }
1081
1082 events
1083 .send(RepoEvent::WantBlock(
1084 Vec::from_iter(missing),
1085 Vec::from_iter(providers),
1086 timeout,
1087 Some(notified),
1088 ))
1089 .await
1090 .ok();
1091
1092 for await block in blocks {
1093 yield block
1094 }
1095 };
1096
1097 this.stream.replace(Box::pin(st));
1098 }
1099 }
1100 }
1101 }
1102}
1103
1104impl IntoFuture for RepoGetBlocks {
1105 type Output = Result<Vec<Block>, Error>;
1106 type IntoFuture = BoxFuture<'static, Self::Output>;
1107 fn into_future(self) -> Self::IntoFuture {
1108 async move {
1109 let col = self.try_collect().await?;
1110 Ok(col)
1111 }
1112 .boxed()
1113 }
1114}
1115
1116pub struct RepoPutBlock<'a> {
1117 repo: Repo,
1118 block: &'a Block,
1119 span: Option<Span>,
1120 broadcast_on_new_block: bool,
1121}
1122
1123impl<'a> RepoPutBlock<'a> {
1124 fn new(repo: &Repo, block: &'a Block) -> Self {
1125 Self {
1126 repo: repo.clone(),
1127 block,
1128 span: None,
1129 broadcast_on_new_block: true,
1130 }
1131 }
1132
1133 pub fn broadcast_on_new_block(mut self, v: bool) -> Self {
1134 self.broadcast_on_new_block = v;
1135 self
1136 }
1137
1138 pub fn span(mut self, span: Span) -> Self {
1139 self.span = Some(span);
1140 self
1141 }
1142}
1143
1144impl IntoFuture for RepoPutBlock<'_> {
1145 type IntoFuture = BoxFuture<'static, Self::Output>;
1146 type Output = Result<Cid, Error>;
1147 fn into_future(self) -> Self::IntoFuture {
1148 let block = self.block.clone();
1149 let span = self.span.unwrap_or(Span::current());
1150 let span = debug_span!(parent: &span, "put_block", cid = %block.cid());
1151 async move {
1152 let _guard = self.repo.inner.gclock.read().await;
1153 let (cid, res) = self.repo.inner.block_store.put(&block).await?;
1154
1155 if let BlockPut::NewBlock = res {
1156 if self.broadcast_on_new_block {
1157 if let Some(mut event) = self.repo.repo_channel() {
1158 _ = event.send(RepoEvent::NewBlock(block.clone())).await;
1159 }
1160 }
1161 let list = self.repo.inner.subscriptions.lock().remove(&cid);
1162 if let Some(mut list) = list {
1163 for ch in list.drain(..) {
1164 let block = block.clone();
1165 let _ = ch.send(Ok(block));
1166 }
1167 }
1168 }
1169
1170 Ok(cid)
1171 }
1172 .instrument(span)
1173 .boxed()
1174 }
1175}
1176
1177pub struct RepoFetch {
1178 repo: Repo,
1179 cid: Cid,
1180 span: Option<Span>,
1181 providers: Vec<PeerId>,
1182 recursive: bool,
1183 timeout: Option<Duration>,
1184 refs: crate::refs::IpldRefs,
1185}
1186
1187impl RepoFetch {
1188 pub fn new<C: Borrow<Cid>>(repo: Repo, cid: C) -> Self {
1189 let cid = cid.borrow();
1190 Self {
1191 repo,
1192 cid: *cid,
1193 recursive: false,
1194 providers: vec![],
1195 timeout: None,
1196 refs: Default::default(),
1197 span: None,
1198 }
1199 }
1200
1201 pub fn recursive(mut self) -> Self {
1203 self.recursive = true;
1204 self
1205 }
1206
1207 pub fn provider(mut self, peer_id: PeerId) -> Self {
1209 if !self.providers.contains(&peer_id) {
1210 self.providers.push(peer_id);
1211 }
1212 self
1213 }
1214
1215 pub fn providers(mut self, providers: &[PeerId]) -> Self {
1217 self.providers = providers.to_vec();
1218 self
1219 }
1220
1221 pub fn depth(mut self, depth: u64) -> Self {
1223 self.refs = self.refs.with_max_depth(depth);
1224 self
1225 }
1226
1227 pub fn timeout(mut self, duration: Duration) -> Self {
1230 self.timeout.replace(duration);
1231 self.refs = self.refs.with_timeout(duration);
1232 self
1233 }
1234
1235 pub fn exit_on_error(mut self) -> Self {
1236 self.refs = self.refs.with_exit_on_error();
1237 self
1238 }
1239
1240 pub fn span(mut self, span: Span) -> Self {
1242 self.span = Some(span);
1243 self
1244 }
1245}
1246
1247impl IntoFuture for RepoFetch {
1248 type Output = Result<(), Error>;
1249
1250 type IntoFuture = BoxFuture<'static, Self::Output>;
1251
1252 fn into_future(self) -> Self::IntoFuture {
1253 let cid = self.cid;
1254 let span = self.span.unwrap_or(Span::current());
1255 let recursive = self.recursive;
1256 let repo = self.repo;
1257 let span = debug_span!(parent: &span, "fetch", cid = %cid, recursive);
1258 let providers = self.providers;
1259 let timeout = self.timeout;
1260 async move {
1261 let _g = repo.inner.gclock.read().await;
1263 let block = repo
1264 .get_block(cid)
1265 .providers(&providers)
1266 .timeout(timeout)
1267 .await?;
1268
1269 if !recursive {
1270 return Ok(());
1271 }
1272 let ipld = block.to_ipld()?;
1273
1274 let mut st = self
1275 .refs
1276 .with_only_unique()
1277 .providers(&providers)
1278 .refs_of_resolved(&repo, vec![(cid, ipld.clone())])
1279 .map_ok(|crate::refs::Edge { destination, .. }| destination)
1280 .into_stream()
1281 .boxed();
1282
1283 while let Some(_c) = st.try_next().await? {}
1284
1285 Ok(())
1286 }
1287 .instrument(span)
1288 .boxed()
1289 }
1290}
1291
1292pub struct RepoInsertPin {
1293 repo: Repo,
1294 cid: Cid,
1295 span: Option<Span>,
1296 providers: Vec<PeerId>,
1297 recursive: bool,
1298 timeout: Option<Duration>,
1299 local: bool,
1300 refs: crate::refs::IpldRefs,
1301}
1302
1303impl RepoInsertPin {
1304 pub fn new<C: Borrow<Cid>>(repo: Repo, cid: C) -> Self {
1305 let cid = cid.borrow();
1306 Self {
1307 repo,
1308 cid: *cid,
1309 recursive: false,
1310 providers: vec![],
1311 local: false,
1312 timeout: None,
1313 refs: Default::default(),
1314 span: None,
1315 }
1316 }
1317
1318 pub fn recursive(mut self) -> Self {
1320 self.recursive = true;
1321 self
1322 }
1323
1324 pub fn local(mut self) -> Self {
1326 self.local = true;
1327 self.refs = self.refs.with_existing_blocks();
1328 self
1329 }
1330
1331 pub fn set_local(mut self, local: bool) -> Self {
1333 self.local = local;
1334 if local {
1335 self.refs = self.refs.with_existing_blocks();
1336 }
1337 self
1338 }
1339
1340 pub fn provider(mut self, peer_id: PeerId) -> Self {
1342 if !self.providers.contains(&peer_id) {
1343 self.providers.push(peer_id);
1344 }
1345 self
1346 }
1347
1348 pub fn providers(mut self, providers: &[PeerId]) -> Self {
1350 self.providers = providers.into();
1351 self
1352 }
1353
1354 pub fn depth(mut self, depth: u64) -> Self {
1356 self.refs = self.refs.with_max_depth(depth);
1357 self
1358 }
1359
1360 pub fn timeout(mut self, duration: Duration) -> Self {
1363 self.timeout.replace(duration);
1364 self.refs = self.refs.with_timeout(duration);
1365 self
1366 }
1367
1368 pub fn exit_on_error(mut self) -> Self {
1369 self.refs = self.refs.with_exit_on_error();
1370 self
1371 }
1372
1373 pub fn span(mut self, span: Span) -> Self {
1375 self.span = Some(span);
1376 self
1377 }
1378}
1379
1380impl IntoFuture for RepoInsertPin {
1381 type Output = Result<(), Error>;
1382
1383 type IntoFuture = BoxFuture<'static, Self::Output>;
1384
1385 fn into_future(self) -> Self::IntoFuture {
1386 let cid = self.cid;
1387 let local = self.local;
1388 let span = self.span.unwrap_or(Span::current());
1389 let recursive = self.recursive;
1390 let repo = self.repo;
1391 let span = debug_span!(parent: &span, "insert_pin", cid = %cid, recursive);
1392 let providers = self.providers;
1393 let timeout = self.timeout;
1394 async move {
1395 let _g = repo.inner.gclock.read().await;
1397 let block = repo
1398 .get_block(cid)
1399 .providers(&providers)
1400 .set_local(local)
1401 .timeout(timeout)
1402 .await?;
1403
1404 if !recursive {
1405 repo.insert_direct_pin(&cid).await?
1406 } else {
1407 let ipld = block.to_ipld()?;
1408
1409 let st = self
1410 .refs
1411 .with_only_unique()
1412 .providers(&providers)
1413 .refs_of_resolved(&repo, vec![(cid, ipld.clone())])
1414 .map_ok(|crate::refs::Edge { destination, .. }| destination)
1415 .into_stream()
1416 .boxed();
1417
1418 repo.insert_recursive_pin(&cid, st).await?
1419 }
1420 Ok(())
1421 }
1422 .instrument(span)
1423 .boxed()
1424 }
1425}
1426
1427pub struct RepoRemovePin {
1428 repo: Repo,
1429 cid: Cid,
1430 span: Option<Span>,
1431 recursive: bool,
1432 refs: crate::refs::IpldRefs,
1433}
1434
1435impl RepoRemovePin {
1436 pub fn new<C: Borrow<Cid>>(repo: Repo, cid: C) -> Self {
1437 let cid = cid.borrow();
1438 Self {
1439 repo,
1440 cid: *cid,
1441 recursive: false,
1442 refs: Default::default(),
1443 span: None,
1444 }
1445 }
1446
1447 pub fn recursive(mut self) -> Self {
1449 self.recursive = true;
1450 self
1451 }
1452
1453 pub fn span(mut self, span: Span) -> Self {
1455 self.span = Some(span);
1456 self
1457 }
1458}
1459
1460impl IntoFuture for RepoRemovePin {
1461 type Output = Result<(), Error>;
1462
1463 type IntoFuture = BoxFuture<'static, Self::Output>;
1464
1465 fn into_future(self) -> Self::IntoFuture {
1466 let cid = self.cid;
1467 let span = self.span.unwrap_or(Span::current());
1468 let recursive = self.recursive;
1469 let repo = self.repo;
1470
1471 let span = debug_span!(parent: &span, "remove_pin", cid = %cid, recursive);
1472 async move {
1473 let _g = repo.inner.gclock.read().await;
1474 if !recursive {
1475 repo.remove_direct_pin(&cid).await
1476 } else {
1477 let block = match repo.get_block_now(&cid).await? {
1480 Some(b) => b,
1481 None => {
1482 return Err(anyhow::anyhow!("pinned root not found: {}", cid));
1483 }
1484 };
1485
1486 let ipld = block.to_ipld()?;
1487 let st = self
1488 .refs
1489 .with_only_unique()
1490 .with_existing_blocks()
1491 .refs_of_resolved(&repo, vec![(cid, ipld)])
1492 .map_ok(|crate::refs::Edge { destination, .. }| destination)
1493 .into_stream()
1494 .boxed();
1495
1496 repo.remove_recursive_pin(&cid, st).await
1497 }
1498 }
1499 .instrument(span)
1500 .boxed()
1501 }
1502}