1use crate::error::Error;
3use crate::Block;
4use connexa::prelude::identity::PeerId;
5use core::fmt::Debug;
6use futures::channel::mpsc::{channel, Receiver, Sender};
7use futures::future::{BoxFuture, Either};
8use futures::sink::SinkExt;
9use futures::stream::{self, BoxStream, FuturesOrdered};
10use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
11use indexmap::IndexSet;
12use ipld_core::cid::Cid;
13use parking_lot::{Mutex, RwLock};
14use std::borrow::Borrow;
15use std::collections::{BTreeSet, HashMap};
16use std::future::{Future, IntoFuture};
17#[allow(unused_imports)]
18use std::path::Path;
19use std::pin::Pin;
20use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use std::time::Duration;
24use std::{error, fmt, io};
25use tokio::sync::{Notify, RwLockReadGuard};
26use tracing::{Instrument, Span};
27
28#[macro_use]
29#[cfg(test)]
30mod common_tests;
31
32pub use store::{blockstore, datastore, default_impl::DefaultStorage};
33
34pub mod lock;
35
36#[cfg(not(target_arch = "wasm32"))]
38pub(crate) mod paths;
39mod store;
40
41#[derive(Debug, PartialEq, Eq)]
43pub enum BlockPut {
44 NewBlock,
46 Existed,
48}
49
50#[derive(Debug)]
52pub enum BlockRm {
53 Removed(Cid),
55 }
57
58#[derive(Debug)]
61pub enum BlockRmError {
62 NotFound(Cid),
65}
66
67pub trait StoreOpt {
68 fn into_opt(self) -> Self;
69}
70
71pub trait BlockStore: Debug + Send + Sync {
73 fn init(&self) -> impl Future<Output = Result<(), Error>> + Send;
74
75 fn contains(&self, cid: &Cid) -> impl Future<Output = Result<bool, Error>> + Send;
77 fn get(&self, cid: &Cid) -> impl Future<Output = Result<Option<Block>, Error>> + Send;
79 fn size(&self, cid: &[Cid]) -> impl Future<Output = Result<Option<usize>, Error>> + Send;
81 fn total_size(&self) -> impl Future<Output = Result<usize, Error>> + Send;
83 fn put(&self, block: &Block) -> impl Future<Output = Result<(Cid, BlockPut), Error>> + Send;
85 fn remove(&self, cid: &Cid) -> impl Future<Output = Result<(), Error>> + Send;
87 fn remove_many(
89 &self,
90 blocks: BoxStream<'static, Cid>,
91 ) -> impl Future<Output = BoxStream<'static, Cid>> + Send;
92 fn list(&self) -> impl Future<Output = BoxStream<'static, Cid>> + Send;
94}
95
96pub trait DataStore: PinStore + Debug + Send + Sync {
98 fn init(&self) -> impl Future<Output = Result<(), Error>> + Send;
99 fn contains(&self, key: &[u8]) -> impl Future<Output = Result<bool, Error>> + Send;
101 fn get(&self, key: &[u8]) -> impl Future<Output = Result<Option<Vec<u8>>, Error>> + Send;
103 fn put(&self, key: &[u8], value: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
105 fn remove(&self, key: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
107 fn iter(
109 &self,
110 ) -> impl Future<Output = futures::stream::BoxStream<'static, (Vec<u8>, Vec<u8>)>> + Send;
111}
112
113#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
114pub struct GCConfig {
115 pub duration: Duration,
118
119 pub trigger: GCTrigger,
121}
122
123impl Default for GCConfig {
124 fn default() -> Self {
125 Self {
126 duration: Duration::from_secs(60 * 60),
127 trigger: GCTrigger::default(),
128 }
129 }
130}
131
132#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord)]
133pub enum GCTrigger {
134 At {
136 size: usize,
137 },
138
139 AtStorage,
140
141 #[default]
143 None,
144}
145
146#[derive(Debug)]
148pub enum LockError {
149 RepoInUse,
150 LockFileOpenFailed(io::Error),
151}
152
153impl fmt::Display for LockError {
154 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155 let msg = match self {
156 LockError::RepoInUse => "The repository is already being used by an IPFS instance.",
157 LockError::LockFileOpenFailed(_) => "Failed to open repository lock file.",
158 };
159
160 write!(f, "{msg}")
161 }
162}
163
164impl From<io::Error> for LockError {
165 fn from(error: io::Error) -> Self {
166 match error.kind() {
167 io::ErrorKind::WouldBlock => LockError::RepoInUse,
170 _ => LockError::LockFileOpenFailed(error),
171 }
172 }
173}
174
175impl error::Error for LockError {
176 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
177 if let Self::LockFileOpenFailed(error) = self {
178 Some(error)
179 } else {
180 None
181 }
182 }
183}
184
185pub trait Lock: Debug + Send + Sync {
190 fn try_exclusive(&self) -> Result<(), LockError>;
192}
193
194type References<'a> = BoxStream<'a, Result<Cid, crate::refs::IpldRefsError>>;
195
196pub trait PinStore: Debug + Send + Sync {
197 fn is_pinned(&self, block: &Cid) -> impl Future<Output = Result<bool, Error>> + Send;
198
199 fn insert_direct_pin(&self, target: &Cid) -> impl Future<Output = Result<(), Error>> + Send;
200
201 fn insert_recursive_pin(
202 &self,
203 target: &Cid,
204 referenced: References<'_>,
205 ) -> impl Future<Output = Result<(), Error>> + Send;
206
207 fn remove_direct_pin(&self, target: &Cid) -> impl Future<Output = Result<(), Error>> + Send;
208
209 fn remove_recursive_pin(
210 &self,
211 target: &Cid,
212 referenced: References<'_>,
213 ) -> impl Future<Output = Result<(), Error>> + Send;
214
215 fn list(
216 &self,
217 mode: Option<PinMode>,
218 ) -> impl Future<Output = futures::stream::BoxStream<'static, Result<(Cid, PinMode), Error>>> + Send;
219
220 fn query(
223 &self,
224 ids: Vec<Cid>,
225 requirement: Option<PinMode>,
226 ) -> impl Future<Output = Result<Vec<(Cid, PinKind<Cid>)>, Error>> + Send;
227}
228
229#[derive(Debug, PartialEq, Eq, Clone, Copy)]
231pub enum PinMode {
232 Indirect,
233 Direct,
234 Recursive,
235}
236
237#[derive(Debug, Clone, Copy)]
240enum PinModeRequirement {
241 Only(PinMode),
242 Any,
243}
244
245impl From<Option<PinMode>> for PinModeRequirement {
246 fn from(filter: Option<PinMode>) -> Self {
247 match filter {
248 Some(one) => PinModeRequirement::Only(one),
249 None => PinModeRequirement::Any,
250 }
251 }
252}
253
254#[allow(dead_code)]
255impl PinModeRequirement {
256 fn is_indirect_or_any(&self) -> bool {
257 use PinModeRequirement::*;
258 match self {
259 Only(PinMode::Indirect) | Any => true,
260 Only(_) => false,
261 }
262 }
263
264 fn matches<P: PartialEq<PinMode>>(&self, other: &P) -> bool {
265 use PinModeRequirement::*;
266 match self {
267 Only(one) if other == one => true,
268 Only(_) => false,
269 Any => true,
270 }
271 }
272
273 fn required(&self) -> Option<PinMode> {
274 use PinModeRequirement::*;
275 match self {
276 Only(one) => Some(*one),
277 Any => None,
278 }
279 }
280}
281
282impl<B: Borrow<Cid>> PartialEq<PinMode> for PinKind<B> {
283 fn eq(&self, other: &PinMode) -> bool {
284 matches!(
285 (self, other),
286 (PinKind::IndirectFrom(_), PinMode::Indirect)
287 | (PinKind::Direct, PinMode::Direct)
288 | (PinKind::Recursive(_), PinMode::Recursive)
289 | (PinKind::RecursiveIntention, PinMode::Recursive)
290 )
291 }
292}
293
294#[derive(Debug, PartialEq, Eq)]
298pub enum PinKind<C: Borrow<Cid>> {
299 IndirectFrom(C),
300 Direct,
301 Recursive(u64),
302 RecursiveIntention,
303}
304
305impl<C: Borrow<Cid>> PinKind<C> {
306 fn as_ref(&self) -> PinKind<&'_ Cid> {
307 match self {
308 PinKind::IndirectFrom(c) => PinKind::IndirectFrom(c.borrow()),
309 PinKind::Direct => PinKind::Direct,
310 PinKind::Recursive(count) => PinKind::Recursive(*count),
311 PinKind::RecursiveIntention => PinKind::RecursiveIntention,
312 }
313 }
314}
315
316type SubscriptionsMap = HashMap<Cid, Vec<futures::channel::oneshot::Sender<Result<Block, String>>>>;
317
318pub trait StorageTypes: RepoTypes {}
320impl<T: RepoTypes> StorageTypes for T {}
321
322pub trait RepoTypes: Clone + Send + Sync + 'static {
323 type TBlockStore: BlockStore;
325 type TDataStore: DataStore;
327 type TLock: Lock;
328}
329
330#[derive(Debug, Clone)]
333pub struct Repo<S: RepoTypes> {
334 pub(crate) inner: Arc<RepoInner<S>>,
335}
336
337#[derive(Debug)]
338pub(crate) struct RepoInner<S: RepoTypes> {
339 online: AtomicBool,
340 initialized: AtomicBool,
341 max_storage_size: AtomicUsize,
342 block_store: S::TBlockStore,
343 data_store: S::TDataStore,
344 events: RwLock<Option<Sender<RepoEvent>>>,
345 pub(crate) subscriptions: Mutex<SubscriptionsMap>,
346 lockfile: S::TLock,
347 pub(crate) gclock: tokio::sync::RwLock<()>,
348}
349
350#[derive(Debug)]
352pub enum RepoEvent {
353 WantBlock(
355 Vec<Cid>,
356 Vec<PeerId>,
357 Option<Duration>,
358 Option<HashMap<Cid, Vec<Arc<Notify>>>>,
359 ),
360 UnwantBlock(Cid),
362 NewBlock(Block),
364 RemovedBlock(Cid),
366}
367
368impl Repo<DefaultStorage> {
369 #[cfg(not(target_arch = "wasm32"))]
390 pub fn new_fs(path: impl AsRef<Path>) -> Self {
391 let mut default = DefaultStorage::default();
392
393 let path = path.as_ref().to_path_buf();
394 let mut blockstore_path = path.clone();
395 let mut datastore_path = path.clone();
396 let mut lockfile_path = path;
397 blockstore_path.push("blockstore");
398 datastore_path.push("datastore");
399 lockfile_path.push("repo_lock");
400
401 default.set_blockstore_path(blockstore_path);
402 default.set_datastore_path(datastore_path);
403 default.set_lockfile(lockfile_path);
404
405 Self::new_raw(default.clone(), default.clone(), default)
406 }
407
408 pub fn new_memory() -> Self {
409 let default = DefaultStorage::default();
410 Self::new_raw(default.clone(), default.clone(), default)
411 }
412
413 #[cfg(target_arch = "wasm32")]
414 pub fn new_idb(namespace: Option<String>) -> Self {
415 let mut default = DefaultStorage::default();
416 default.set_namespace(namespace);
417 Self::new_raw(default.clone(), default.clone(), default)
418 }
419}
420
421impl<S: RepoTypes> Repo<S> {
422 pub fn new_raw(
423 block_store: S::TBlockStore,
424 data_store: S::TDataStore,
425 lockfile: S::TLock,
426 ) -> Self {
427 let inner = RepoInner {
428 initialized: AtomicBool::default(),
429 online: AtomicBool::default(),
430 block_store,
431 data_store,
432 events: Default::default(),
433 subscriptions: Default::default(),
434 lockfile,
435 max_storage_size: Default::default(),
436 gclock: Default::default(),
437 };
438 Repo {
439 inner: Arc::new(inner),
440 }
441 }
442
443 pub fn set_max_storage_size(&self, size: usize) {
444 self.inner.max_storage_size.store(size, Ordering::SeqCst);
445 }
446
447 pub fn max_storage_size(&self) -> usize {
448 self.inner.max_storage_size.load(Ordering::SeqCst)
449 }
450
451 pub async fn migrate(&self, repo: &Self) -> Result<(), Error> {
452 if self.is_online() || repo.is_online() {
453 anyhow::bail!("Repository cannot be online");
454 }
455 let block_migration = {
456 async move {
457 let mut stream = self.list_blocks().await;
458 while let Some(cid) = stream.next().await {
459 match self.get_block_now(&cid).await {
460 Ok(Some(block)) => match repo.inner.block_store.put(&block).await {
461 Ok(_) => {}
462 Err(e) => error!("Error migrating {cid}: {e}"),
463 },
464 Ok(None) => error!("{cid} doesnt exist"),
465 Err(e) => error!("Error getting block {cid}: {e}"),
466 }
467 }
468 }
469 };
470
471 let data_migration = {
472 async move {
473 let mut data_stream = self.data_store().iter().await;
474 while let Some((k, v)) = data_stream.next().await {
475 if let Err(e) = repo.data_store().put(&k, &v).await {
476 error!("Unable to migrate {k:?} into repo: {e}");
477 }
478 }
479 }
480 };
481
482 let pins_migration = {
483 async move {
484 let mut stream = self.data_store().list(None).await;
485 while let Some(Ok((cid, pin_mode))) = stream.next().await {
486 match pin_mode {
487 PinMode::Direct => match repo.data_store().insert_direct_pin(&cid).await {
488 Ok(_) => {}
489 Err(e) => error!("Unable to migrate pin {cid}: {e}"),
490 },
491 PinMode::Indirect => {
492 continue;
494 }
495 PinMode::Recursive => {
496 let block = match self
497 .get_block_now(&cid)
498 .await
499 .map(|block| block.and_then(|block| block.to_ipld().ok()))
500 {
501 Ok(Some(block)) => block,
502 Ok(None) => continue,
503 Err(e) => {
504 error!("Block {cid} does not exist but is pinned: {e}");
505 continue;
506 }
507 };
508
509 let st = crate::refs::IpldRefs::default()
510 .with_only_unique()
511 .refs_of_resolved(self, vec![(cid, block.clone())].into_iter())
512 .map_ok(|crate::refs::Edge { destination, .. }| destination)
513 .into_stream()
514 .boxed();
515
516 if let Err(e) = repo.insert_recursive_pin(&cid, st).await {
517 error!("Error migrating pin {cid}: {e}");
518 continue;
519 }
520 }
521 }
522 }
523 }
524 };
525
526 futures::join!(block_migration, data_migration, pins_migration);
527
528 Ok(())
529 }
530
531 pub(crate) fn initialize_channel(&self) -> Receiver<RepoEvent> {
532 let mut event_guard = self.inner.events.write();
533 let (sender, receiver) = channel(1);
534 debug_assert!(event_guard.is_none());
535 *event_guard = Some(sender);
536 self.set_online();
537 receiver
538 }
539
540 pub fn shutdown(&self) {
543 let mut map = self.inner.subscriptions.lock();
544 map.clear();
545 drop(map);
546 if let Some(mut event) = self.inner.events.write().take() {
547 event.close_channel()
548 }
549 self.set_offline();
550 }
551
552 pub fn is_online(&self) -> bool {
553 self.inner.online.load(Ordering::SeqCst)
554 }
555
556 pub(crate) fn set_online(&self) {
557 if self.is_online() {
558 return;
559 }
560
561 self.inner.online.store(true, Ordering::SeqCst)
562 }
563
564 pub(crate) fn set_offline(&self) {
565 if !self.is_online() {
566 return;
567 }
568
569 self.inner.online.store(false, Ordering::SeqCst)
570 }
571
572 fn repo_channel(&self) -> Option<Sender<RepoEvent>> {
573 self.inner.events.read().clone()
574 }
575
576 pub async fn init(&self) -> Result<(), Error> {
577 if self.inner.initialized.load(Ordering::SeqCst) {
579 return Ok(());
580 }
581 {
584 tracing::debug!("Trying lockfile");
585 self.inner.lockfile.try_exclusive()?;
586 tracing::debug!("lockfile tried");
587 }
588
589 self.inner.block_store.init().await?;
590 self.inner.data_store.init().await?;
591 self.inner.initialized.store(true, Ordering::SeqCst);
592 Ok(())
593 }
594
595 pub fn put_block(&self, block: &Block) -> RepoPutBlock<S> {
597 RepoPutBlock::new(self, block).broadcast_on_new_block(true)
598 }
599
600 #[inline]
603 pub fn get_block<C: Borrow<Cid>>(&self, cid: C) -> RepoGetBlock<S> {
604 RepoGetBlock::new(Repo::clone(self), cid)
605 }
606
607 #[inline]
610 pub fn get_blocks(&self, cids: impl IntoIterator<Item = impl Borrow<Cid>>) -> RepoGetBlocks<S> {
611 RepoGetBlocks::new(Repo::clone(self)).blocks(cids)
612 }
613
614 #[inline]
616 pub async fn get_blocks_size(&self, cids: &[Cid]) -> Result<Option<usize>, Error> {
617 self.inner.block_store.size(cids).await
618 }
619
620 #[inline]
622 pub async fn get_total_size(&self) -> Result<usize, Error> {
623 self.inner.block_store.total_size().await
624 }
625
626 pub async fn get_block_now<C: Borrow<Cid>>(&self, cid: C) -> Result<Option<Block>, Error> {
628 let cid = cid.borrow();
629 self.inner.block_store.get(cid).await
630 }
631
632 pub async fn contains<C: Borrow<Cid>>(&self, cid: C) -> Result<bool, Error> {
634 let cid = cid.borrow();
635 self.inner.block_store.contains(cid).await
636 }
637
638 pub async fn list_blocks(&self) -> BoxStream<'static, Cid> {
640 self.inner.block_store.list().await
641 }
642
643 pub async fn remove_block<C: Borrow<Cid>>(
645 &self,
646 cid: C,
647 recursive: bool,
648 ) -> Result<Vec<Cid>, Error> {
649 let _guard = self.inner.gclock.read().await;
650 let cid = cid.borrow();
651 if self.is_pinned(cid).await? {
652 return Err(anyhow::anyhow!("block to remove is pinned"));
653 }
654
655 let list = match recursive {
656 true => {
657 let mut list = self.recursive_collections(*cid).await;
658 list.insert(*cid);
660 list
661 }
662 false => BTreeSet::from_iter(std::iter::once(*cid)),
663 };
664
665 let list = stream::iter(
666 FuturesOrdered::from_iter(list.into_iter().map(|cid| async move { cid }))
667 .filter_map(|cid| async move {
668 (!self.is_pinned(&cid).await.unwrap_or_default()).then_some(cid)
669 })
670 .collect::<Vec<Cid>>()
671 .await,
672 )
673 .boxed();
674
675 let removed = self
676 .inner
677 .block_store
678 .remove_many(list)
679 .await
680 .collect::<Vec<_>>()
681 .await;
682
683 for cid in &removed {
684 if let Some(mut events) = self.repo_channel() {
686 let _ = events.send(RepoEvent::RemovedBlock(*cid)).await;
687 }
688 }
689 Ok(removed)
690 }
691
692 fn recursive_collections(&self, cid: Cid) -> BoxFuture<'_, BTreeSet<Cid>> {
693 async move {
694 let block = match self.get_block_now(&cid).await {
695 Ok(Some(block)) => block,
696 _ => return BTreeSet::default(),
697 };
698
699 let mut references: BTreeSet<Cid> = BTreeSet::new();
700 if block.references(&mut references).is_err() {
701 return BTreeSet::default();
702 }
703
704 let mut list = BTreeSet::new();
705
706 for cid in &references {
707 let mut inner_list = self.recursive_collections(*cid).await;
708 list.append(&mut inner_list);
709 }
710
711 references.append(&mut list);
712 references
713 }
714 .boxed()
715 }
716
717 pub fn pin<C: Borrow<Cid>>(&self, cid: C) -> RepoInsertPin<S> {
730 RepoInsertPin::new(Repo::clone(self), cid)
731 }
732
733 pub fn remove_pin<C: Borrow<Cid>>(&self, cid: C) -> RepoRemovePin<S> {
740 RepoRemovePin::new(Repo::clone(self), cid)
741 }
742
743 pub fn fetch<C: Borrow<Cid>>(&self, cid: C) -> RepoFetch<S> {
744 RepoFetch::new(Repo::clone(self), cid)
745 }
746
747 pub(crate) async fn insert_pin(
749 &self,
750 cid: &Cid,
751 recursive: bool,
752 local_only: bool,
753 ) -> Result<(), Error> {
754 let mut pin_fut = self.pin(cid);
755 if recursive {
756 pin_fut = pin_fut.recursive();
757 }
758 if local_only {
759 pin_fut = pin_fut.local();
760 }
761 pin_fut.await
762 }
763
764 pub(crate) async fn insert_direct_pin(&self, cid: &Cid) -> Result<(), Error> {
766 self.inner.data_store.insert_direct_pin(cid).await
767 }
768
769 pub(crate) async fn insert_recursive_pin(
771 &self,
772 cid: &Cid,
773 refs: References<'_>,
774 ) -> Result<(), Error> {
775 self.inner.data_store.insert_recursive_pin(cid, refs).await
776 }
777
778 pub(crate) async fn remove_direct_pin(&self, cid: &Cid) -> Result<(), Error> {
780 self.inner.data_store.remove_direct_pin(cid).await
781 }
782
783 pub(crate) async fn remove_recursive_pin(
785 &self,
786 cid: &Cid,
787 refs: References<'_>,
788 ) -> Result<(), Error> {
789 self.inner.data_store.remove_recursive_pin(cid, refs).await
791 }
792
793 pub(crate) async fn cleanup(&self) -> Result<Vec<Cid>, Error> {
795 let repo = self.clone();
796
797 let blocks = repo.list_blocks().await;
798 let pins = repo
799 .list_pins(None)
800 .await
801 .filter_map(|result| futures::future::ready(result.map(|(cid, _)| cid).ok()))
802 .collect::<Vec<_>>()
803 .await;
804
805 let stream = async_stream::stream! {
806 for await cid in blocks {
807 if pins.contains(&cid) {
808 continue;
809 }
810 yield cid;
811 }
812 }
813 .boxed();
814
815 let removed_blocks = self
816 .inner
817 .block_store
818 .remove_many(stream)
819 .await
820 .collect::<Vec<_>>()
821 .await;
822
823 Ok(removed_blocks)
824 }
825
826 pub async fn is_pinned<C: Borrow<Cid>>(&self, cid: C) -> Result<bool, Error> {
828 let cid = cid.borrow();
829 self.inner.data_store.is_pinned(cid).await
830 }
831
832 pub async fn list_pins(
833 &self,
834 mode: impl Into<Option<PinMode>>,
835 ) -> BoxStream<'static, Result<(Cid, PinMode), Error>> {
836 let mode = mode.into();
837 self.inner.data_store.list(mode).await
838 }
839
840 pub async fn query_pins(
841 &self,
842 cids: Vec<Cid>,
843 requirement: impl Into<Option<PinMode>>,
844 ) -> Result<Vec<(Cid, PinKind<Cid>)>, Error> {
845 let requirement = requirement.into();
846 self.inner.data_store.query(cids, requirement).await
847 }
848}
849
850pub struct GCGuard<'a> {
851 _g: RwLockReadGuard<'a, ()>,
852}
853
854impl<S: RepoTypes> Repo<S> {
855 pub async fn gc_guard(&self) -> GCGuard<'_> {
859 let _g = self.inner.gclock.read().await;
860 GCGuard { _g }
861 }
862
863 pub fn data_store(&self) -> &S::TDataStore {
864 &self.inner.data_store
865 }
866}
867
868pub struct RepoGetBlock<S: RepoTypes> {
869 instance: RepoGetBlocks<S>,
870}
871
872impl<S: RepoTypes> RepoGetBlock<S> {
873 pub fn new(repo: Repo<S>, cid: impl Borrow<Cid>) -> Self {
874 let instance = RepoGetBlocks::new(repo).block(cid);
875 Self { instance }
876 }
877
878 pub fn span(mut self, span: impl Borrow<Span>) -> Self {
879 self.instance = self.instance.span(span);
880 self
881 }
882
883 pub fn timeout(mut self, timeout: impl Into<Option<Duration>>) -> Self {
884 self.instance = self.instance.timeout(timeout);
885 self
886 }
887
888 pub fn local(mut self) -> Self {
889 self.instance = self.instance.local();
890 self
891 }
892
893 pub fn set_local(mut self, local: bool) -> Self {
894 self.instance = self.instance.set_local(local);
895 self
896 }
897
898 pub fn provider(mut self, peer_id: PeerId) -> Self {
900 self.instance = self.instance.provider(peer_id);
901 self
902 }
903
904 pub fn providers(mut self, providers: impl IntoIterator<Item = impl Borrow<PeerId>>) -> Self {
906 self.instance = self.instance.providers(providers);
907 self
908 }
909}
910
911impl<S: RepoTypes> Future for RepoGetBlock<S> {
912 type Output = Result<Block, Error>;
913 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
914 let this = &mut self;
915 match futures::ready!(this.instance.poll_next_unpin(cx)) {
916 Some(result) => Poll::Ready(result),
917 None => Poll::Ready(Err(anyhow::anyhow!("block does not exist"))),
918 }
919 }
920}
921
922pub struct RepoGetBlocks<S: RepoTypes> {
923 repo: Option<Repo<S>>,
924 cids: IndexSet<Cid>,
925 providers: IndexSet<PeerId>,
926 local: bool,
927 span: Span,
928 timeout: Option<Duration>,
929 stream: Option<BoxStream<'static, Result<Block, Error>>>,
930}
931
932impl<S: RepoTypes> RepoGetBlocks<S> {
933 pub fn new(repo: Repo<S>) -> Self {
934 Self {
935 repo: Some(repo),
936 cids: IndexSet::new(),
937 providers: IndexSet::new(),
938 local: false,
939 span: Span::current(),
940 timeout: None,
941 stream: None,
942 }
943 }
944
945 pub fn blocks(mut self, cids: impl IntoIterator<Item = impl Borrow<Cid>>) -> Self {
946 self.cids.extend(cids.into_iter().map(|cid| *cid.borrow()));
947 self
948 }
949
950 pub fn block(self, cid: impl Borrow<Cid>) -> Self {
951 self.blocks([cid])
952 }
953
954 pub fn span(mut self, span: impl Borrow<Span>) -> Self {
955 let span = span.borrow();
956 self.span = span.clone();
957 self
958 }
959
960 pub fn timeout(mut self, timeout: impl Into<Option<Duration>>) -> Self {
961 self.timeout = timeout.into();
962 self
963 }
964
965 pub fn local(mut self) -> Self {
966 self.local = true;
967 self
968 }
969
970 pub fn set_local(mut self, local: bool) -> Self {
971 self.local = local;
972 self
973 }
974
975 pub fn provider(self, peer_id: impl Borrow<PeerId>) -> Self {
976 self.providers([peer_id])
977 }
978
979 pub fn providers(mut self, providers: impl IntoIterator<Item = impl Borrow<PeerId>>) -> Self {
980 self.providers
981 .extend(providers.into_iter().map(|k| *k.borrow()));
982 self
983 }
984}
985
986impl<S: RepoTypes> Stream for RepoGetBlocks<S> {
987 type Item = Result<Block, Error>;
988
989 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
990 if self.stream.is_none() && self.repo.is_none() {
991 return Poll::Ready(None);
992 }
993
994 let this = &mut *self;
995
996 loop {
997 match &mut this.stream {
998 Some(stream) => {
999 let _g = this.span.enter();
1000 match futures::ready!(stream.poll_next_unpin(cx)) {
1001 Some(item) => return Poll::Ready(Some(item)),
1002 None => {
1003 this.stream.take();
1004 return Poll::Ready(None);
1005 }
1006 }
1007 }
1008 None => {
1009 let repo = this.repo.take().expect("valid repo instance");
1010 let providers = std::mem::take(&mut this.providers);
1011 let cids = std::mem::take(&mut this.cids);
1012 let local_only = this.local;
1013 let timeout = this.timeout;
1014
1015 let st = async_stream::stream! {
1016 let _guard = repo.gc_guard().await;
1017 let mut missing: IndexSet<Cid> = cids.clone();
1018 for cid in &cids {
1019 if let Ok(Some(block)) = repo.get_block_now(cid).await {
1020 yield Ok(block);
1021 missing.shift_remove(cid);
1022 }
1023 }
1024
1025 if missing.is_empty() {
1026 return;
1027 }
1028
1029 if local_only || !repo.is_online() {
1030 yield Err(anyhow::anyhow!("Unable to locate missing blocks {missing:?}"));
1031 return;
1032 }
1033
1034 let mut events = match repo.repo_channel() {
1035 Some(events) => events,
1036 None => {
1037 yield Err(anyhow::anyhow!("Channel is not available"));
1038 return;
1039 }
1040 };
1041
1042 let mut notified: HashMap<Cid, Vec<_>> = HashMap::new();
1043
1044 let timeout = timeout.or(Some(Duration::from_secs(60)));
1045
1046 let mut blocks = FuturesOrdered::new();
1047
1048 for cid in &missing {
1049 let cid = *cid;
1050 let (tx, rx) = futures::channel::oneshot::channel();
1051 repo.inner
1052 .subscriptions
1053 .lock()
1054 .entry(cid)
1055 .or_default()
1056 .push(tx);
1057
1058 let mut events = events.clone();
1059 let signal = Arc::new(Notify::new());
1060 let s2 = signal.clone();
1061 let task = async move {
1062 let block_fut = rx;
1063 let notified_fut = signal.notified();
1064 futures::pin_mut!(notified_fut);
1065
1066 match futures::future::select(block_fut, notified_fut).await {
1067 Either::Left((Ok(Ok(block)), _)) => Ok::<_, Error>(block),
1068 Either::Left((Ok(Err(e)), _)) => Err::<_, Error>(anyhow::anyhow!("{e}")),
1069 Either::Left((Err(e), _)) => Err::<_, Error>(e.into()),
1070 Either::Right(((), _)) => {
1071 Err::<_, Error>(anyhow::anyhow!("request for {cid} has been cancelled"))
1072 }
1073 }
1074 }
1075 .map_err(move |e| {
1076 _ = events.try_send(RepoEvent::UnwantBlock(cid));
1079 e
1080 })
1081 .boxed();
1082
1083 notified.entry(cid).or_default().push(s2);
1084
1085 blocks.push_back(task);
1086 }
1087
1088 events
1089 .send(RepoEvent::WantBlock(
1090 Vec::from_iter(missing),
1091 Vec::from_iter(providers),
1092 timeout,
1093 Some(notified),
1094 ))
1095 .await
1096 .ok();
1097
1098 for await block in blocks {
1099 yield block
1100 }
1101 };
1102
1103 this.stream.replace(Box::pin(st));
1104 }
1105 }
1106 }
1107 }
1108}
1109
1110impl<S: RepoTypes> IntoFuture for RepoGetBlocks<S> {
1111 type Output = Result<Vec<Block>, Error>;
1112 type IntoFuture = BoxFuture<'static, Self::Output>;
1113 fn into_future(self) -> Self::IntoFuture {
1114 async move {
1115 let col = self.try_collect().await?;
1116 Ok(col)
1117 }
1118 .boxed()
1119 }
1120}
1121
1122pub struct RepoPutBlock<S: RepoTypes> {
1123 repo: Repo<S>,
1124 block: Option<Block>,
1125 span: Option<Span>,
1126 broadcast_on_new_block: bool,
1127}
1128
1129impl<S: RepoTypes> RepoPutBlock<S> {
1130 fn new(repo: &Repo<S>, block: &Block) -> Self {
1131 let block = Some(block.clone());
1132 Self {
1133 repo: Repo::clone(repo),
1134 block,
1135 span: None,
1136 broadcast_on_new_block: true,
1137 }
1138 }
1139
1140 pub fn broadcast_on_new_block(mut self, v: bool) -> Self {
1141 self.broadcast_on_new_block = v;
1142 self
1143 }
1144
1145 pub fn span(mut self, span: Span) -> Self {
1146 self.span = Some(span);
1147 self
1148 }
1149}
1150
1151impl<S: RepoTypes> IntoFuture for RepoPutBlock<S> {
1152 type IntoFuture = BoxFuture<'static, Self::Output>;
1153 type Output = Result<Cid, Error>;
1154 fn into_future(mut self) -> Self::IntoFuture {
1155 let block = self.block.take().expect("valid block is set");
1156 let span = self.span.unwrap_or(Span::current());
1157 let span = debug_span!(parent: &span, "put_block", cid = %block.cid());
1158 async move {
1159 let _guard = self.repo.inner.gclock.read().await;
1160 let (cid, res) = self.repo.inner.block_store.put(&block).await?;
1161
1162 if let BlockPut::NewBlock = res {
1163 if self.broadcast_on_new_block {
1164 if let Some(mut event) = self.repo.repo_channel() {
1165 _ = event.send(RepoEvent::NewBlock(block.clone())).await;
1166 }
1167 }
1168 let list = self.repo.inner.subscriptions.lock().remove(&cid);
1169 if let Some(mut list) = list {
1170 for ch in list.drain(..) {
1171 let block = block.clone();
1172 let _ = ch.send(Ok(block));
1173 }
1174 }
1175 }
1176
1177 Ok(cid)
1178 }
1179 .instrument(span)
1180 .boxed()
1181 }
1182}
1183
1184pub struct RepoFetch<S: RepoTypes> {
1185 repo: Repo<S>,
1186 cid: Cid,
1187 span: Option<Span>,
1188 providers: Vec<PeerId>,
1189 recursive: bool,
1190 timeout: Option<Duration>,
1191 refs: crate::refs::IpldRefs,
1192}
1193
1194impl<S: RepoTypes> RepoFetch<S> {
1195 pub fn new<C: Borrow<Cid>>(repo: Repo<S>, cid: C) -> Self {
1196 let cid = cid.borrow();
1197 Self {
1198 repo,
1199 cid: *cid,
1200 recursive: false,
1201 providers: vec![],
1202 timeout: None,
1203 refs: Default::default(),
1204 span: None,
1205 }
1206 }
1207
1208 pub fn recursive(mut self) -> Self {
1210 self.recursive = true;
1211 self
1212 }
1213
1214 pub fn provider(mut self, peer_id: PeerId) -> Self {
1216 if !self.providers.contains(&peer_id) {
1217 self.providers.push(peer_id);
1218 }
1219 self
1220 }
1221
1222 pub fn providers(mut self, providers: &[PeerId]) -> Self {
1224 self.providers = providers.to_vec();
1225 self
1226 }
1227
1228 pub fn depth(mut self, depth: u64) -> Self {
1230 self.refs = self.refs.with_max_depth(depth);
1231 self
1232 }
1233
1234 pub fn timeout(mut self, duration: Duration) -> Self {
1237 self.timeout.replace(duration);
1238 self.refs = self.refs.with_timeout(duration);
1239 self
1240 }
1241
1242 pub fn exit_on_error(mut self) -> Self {
1243 self.refs = self.refs.with_exit_on_error();
1244 self
1245 }
1246
1247 pub fn span(mut self, span: Span) -> Self {
1249 self.span = Some(span);
1250 self
1251 }
1252}
1253
1254impl<S: RepoTypes> IntoFuture for RepoFetch<S> {
1255 type Output = Result<(), Error>;
1256
1257 type IntoFuture = BoxFuture<'static, Self::Output>;
1258
1259 fn into_future(self) -> Self::IntoFuture {
1260 let cid = self.cid;
1261 let span = self.span.unwrap_or(Span::current());
1262 let recursive = self.recursive;
1263 let repo = self.repo;
1264 let span = debug_span!(parent: &span, "fetch", cid = %cid, recursive);
1265 let providers = self.providers;
1266 let timeout = self.timeout;
1267 async move {
1268 let _g = repo.inner.gclock.read().await;
1270 let block = repo
1271 .get_block(cid)
1272 .providers(&providers)
1273 .timeout(timeout)
1274 .await?;
1275
1276 if !recursive {
1277 return Ok(());
1278 }
1279 let ipld = block.to_ipld()?;
1280
1281 let mut st = self
1282 .refs
1283 .with_only_unique()
1284 .providers(&providers)
1285 .refs_of_resolved(&repo, vec![(cid, ipld.clone())])
1286 .map_ok(|crate::refs::Edge { destination, .. }| destination)
1287 .into_stream()
1288 .boxed();
1289
1290 while let Some(_c) = st.try_next().await? {}
1291
1292 Ok(())
1293 }
1294 .instrument(span)
1295 .boxed()
1296 }
1297}
1298
1299pub struct RepoInsertPin<S: RepoTypes> {
1300 repo: Repo<S>,
1301 cid: Cid,
1302 span: Option<Span>,
1303 providers: Vec<PeerId>,
1304 recursive: bool,
1305 timeout: Option<Duration>,
1306 local: bool,
1307 refs: crate::refs::IpldRefs,
1308}
1309
1310impl<S: RepoTypes> RepoInsertPin<S> {
1311 pub fn new<C: Borrow<Cid>>(repo: Repo<S>, cid: C) -> Self {
1312 let cid = cid.borrow();
1313 Self {
1314 repo,
1315 cid: *cid,
1316 recursive: false,
1317 providers: vec![],
1318 local: false,
1319 timeout: None,
1320 refs: Default::default(),
1321 span: None,
1322 }
1323 }
1324
1325 pub fn recursive(mut self) -> Self {
1327 self.recursive = true;
1328 self
1329 }
1330
1331 pub fn local(mut self) -> Self {
1333 self.local = true;
1334 self.refs = self.refs.with_existing_blocks();
1335 self
1336 }
1337
1338 pub fn set_local(mut self, local: bool) -> Self {
1340 self.local = local;
1341 if local {
1342 self.refs = self.refs.with_existing_blocks();
1343 }
1344 self
1345 }
1346
1347 pub fn provider(mut self, peer_id: PeerId) -> Self {
1349 if !self.providers.contains(&peer_id) {
1350 self.providers.push(peer_id);
1351 }
1352 self
1353 }
1354
1355 pub fn providers(mut self, providers: &[PeerId]) -> Self {
1357 self.providers = providers.into();
1358 self
1359 }
1360
1361 pub fn depth(mut self, depth: u64) -> Self {
1363 self.refs = self.refs.with_max_depth(depth);
1364 self
1365 }
1366
1367 pub fn timeout(mut self, duration: Duration) -> Self {
1370 self.timeout.replace(duration);
1371 self.refs = self.refs.with_timeout(duration);
1372 self
1373 }
1374
1375 pub fn exit_on_error(mut self) -> Self {
1376 self.refs = self.refs.with_exit_on_error();
1377 self
1378 }
1379
1380 pub fn span(mut self, span: Span) -> Self {
1382 self.span = Some(span);
1383 self
1384 }
1385}
1386
1387impl<S: RepoTypes> IntoFuture for RepoInsertPin<S> {
1388 type Output = Result<(), Error>;
1389
1390 type IntoFuture = BoxFuture<'static, Self::Output>;
1391
1392 fn into_future(self) -> Self::IntoFuture {
1393 let cid = self.cid;
1394 let local = self.local;
1395 let span = self.span.unwrap_or(Span::current());
1396 let recursive = self.recursive;
1397 let repo = self.repo;
1398 let span = debug_span!(parent: &span, "insert_pin", cid = %cid, recursive);
1399 let providers = self.providers;
1400 let timeout = self.timeout;
1401 async move {
1402 let _g = repo.inner.gclock.read().await;
1404 let block = repo
1405 .get_block(cid)
1406 .providers(&providers)
1407 .set_local(local)
1408 .timeout(timeout)
1409 .await?;
1410
1411 if !recursive {
1412 repo.insert_direct_pin(&cid).await?
1413 } else {
1414 let ipld = block.to_ipld()?;
1415
1416 let st = self
1417 .refs
1418 .with_only_unique()
1419 .providers(&providers)
1420 .refs_of_resolved(&repo, vec![(cid, ipld.clone())])
1421 .map_ok(|crate::refs::Edge { destination, .. }| destination)
1422 .into_stream()
1423 .boxed();
1424
1425 repo.insert_recursive_pin(&cid, st).await?
1426 }
1427 Ok(())
1428 }
1429 .instrument(span)
1430 .boxed()
1431 }
1432}
1433
1434pub struct RepoRemovePin<S: RepoTypes> {
1435 repo: Repo<S>,
1436 cid: Cid,
1437 span: Option<Span>,
1438 recursive: bool,
1439 refs: crate::refs::IpldRefs,
1440}
1441
1442impl<S: RepoTypes> RepoRemovePin<S> {
1443 pub fn new<C: Borrow<Cid>>(repo: Repo<S>, cid: C) -> Self {
1444 let cid = cid.borrow();
1445 Self {
1446 repo,
1447 cid: *cid,
1448 recursive: false,
1449 refs: Default::default(),
1450 span: None,
1451 }
1452 }
1453
1454 pub fn recursive(mut self) -> Self {
1456 self.recursive = true;
1457 self
1458 }
1459
1460 pub fn span(mut self, span: Span) -> Self {
1462 self.span = Some(span);
1463 self
1464 }
1465}
1466
1467impl<S: RepoTypes> IntoFuture for RepoRemovePin<S> {
1468 type Output = Result<(), Error>;
1469
1470 type IntoFuture = BoxFuture<'static, Self::Output>;
1471
1472 fn into_future(self) -> Self::IntoFuture {
1473 let cid = self.cid;
1474 let span = self.span.unwrap_or(Span::current());
1475 let recursive = self.recursive;
1476 let repo = self.repo;
1477
1478 let span = debug_span!(parent: &span, "remove_pin", cid = %cid, recursive);
1479 async move {
1480 let _g = repo.inner.gclock.read().await;
1481 if !recursive {
1482 repo.remove_direct_pin(&cid).await
1483 } else {
1484 let block = match repo.get_block_now(&cid).await? {
1487 Some(b) => b,
1488 None => {
1489 return Err(anyhow::anyhow!("pinned root not found: {}", cid));
1490 }
1491 };
1492
1493 let ipld = block.to_ipld()?;
1494 let st = self
1495 .refs
1496 .with_only_unique()
1497 .with_existing_blocks()
1498 .refs_of_resolved(&repo, vec![(cid, ipld)])
1499 .map_ok(|crate::refs::Edge { destination, .. }| destination)
1500 .into_stream()
1501 .boxed();
1502
1503 repo.remove_recursive_pin(&cid, st).await
1504 }
1505 }
1506 .instrument(span)
1507 .boxed()
1508 }
1509}