ipfs_embed_core/
lib.rs

1pub use anyhow::{Error, Result};
2pub use async_trait::async_trait;
3use fnv::FnvHashSet;
4use futures::channel::oneshot;
5use futures::stream::Stream;
6pub use libipld::block::Block;
7pub use libipld::cid::Cid;
8use libipld::codec::References;
9pub use libipld::error::BlockNotFound;
10use libipld::ipld::Ipld;
11pub use libipld::multihash::{MultihashDigest, U64};
12pub use libipld::store::{Store, StoreParams};
13pub use libp2p::swarm::AddressRecord;
14pub use libp2p::{Multiaddr, PeerId};
15pub use libp2p_bitswap::{BitswapStore, BitswapSync, Query, QueryResult, QueryType};
16use std::marker::PhantomData;
17use std::sync::Arc;
18
19#[derive(Clone, Debug)]
20pub enum NetworkEvent {
21    QueryComplete(Query, QueryResult),
22}
23
24pub trait Network<S: StoreParams>: Send + Sync + 'static {
25    type Subscription: Stream<Item = NetworkEvent> + Send + Unpin;
26    fn local_peer_id(&self) -> &PeerId;
27    fn listeners(&self, tx: oneshot::Sender<Vec<Multiaddr>>);
28    fn external_addresses(&self, tx: oneshot::Sender<Vec<AddressRecord>>);
29    fn get(&self, cid: Cid);
30    fn cancel_get(&self, cid: Cid);
31    fn sync(&self, cid: Cid, syncer: Arc<dyn BitswapSync>);
32    fn cancel_sync(&self, cid: Cid);
33    fn provide(&self, cid: Cid);
34    fn unprovide(&self, cid: Cid);
35    fn subscribe(&self) -> Self::Subscription;
36}
37
38#[derive(Clone, Debug, Eq, PartialEq)]
39pub enum StorageEvent {
40    Insert(Cid),
41    Remove(Cid),
42}
43
44#[async_trait]
45pub trait Storage<S: StoreParams>: Send + Sync + 'static {
46    type Subscription: Stream<Item = StorageEvent> + Send + Unpin;
47    fn contains(&self, cid: &Cid) -> Result<bool>;
48    fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>>;
49    fn insert(&self, block: &Block<S>) -> Result<()>;
50    async fn alias<T: AsRef<[u8]> + Send + Sync>(&self, alias: T, cid: Option<&Cid>) -> Result<()>;
51    fn resolve<T: AsRef<[u8]> + Send + Sync>(&self, alias: T) -> Result<Option<Cid>>;
52    async fn pinned(&self, cid: &Cid) -> Result<Option<bool>>;
53    fn subscribe(&self) -> Self::Subscription;
54}
55
56pub struct BitswapStorage<S: StoreParams, T: Storage<S>> {
57    marker: PhantomData<S>,
58    store: Arc<T>,
59}
60
61impl<S: StoreParams, T: Storage<S>> BitswapStorage<S, T> {
62    pub fn new(store: Arc<T>) -> Self {
63        Self {
64            marker: PhantomData,
65            store,
66        }
67    }
68}
69
70impl<S: StoreParams, T: Storage<S>> BitswapStore<S> for BitswapStorage<S, T> {
71    fn contains(&self, cid: &Cid) -> bool {
72        match self.store.contains(cid) {
73            Ok(b) => b,
74            Err(err) => {
75                log::error!("contains: {:?}", err);
76                false
77            }
78        }
79    }
80
81    fn get(&self, cid: &Cid) -> Option<Vec<u8>> {
82        match self.store.get(cid) {
83            Ok(data) => data,
84            Err(err) => {
85                log::error!("contains: {:?}", err);
86                None
87            }
88        }
89    }
90
91    fn insert(&self, cid: Cid, data: Vec<u8>) {
92        let block = Block::new_unchecked(cid, data);
93        if let Err(err) = self.store.insert(&block) {
94            log::error!("insert: {:?}", err);
95        }
96    }
97}
98
99impl<S: StoreParams, T: Storage<S>> BitswapSync for BitswapStorage<S, T>
100where
101    Ipld: References<S::Codecs>,
102{
103    fn references(&self, cid: &Cid) -> Box<dyn Iterator<Item = Cid>> {
104        if let Some(data) = self.get(cid) {
105            let block = Block::<S>::new_unchecked(*cid, data);
106            let mut refs = FnvHashSet::default();
107            if block.references(&mut refs).is_ok() {
108                return Box::new(refs.into_iter());
109            }
110        }
111        Box::new(std::iter::empty())
112    }
113
114    fn contains(&self, cid: &Cid) -> bool {
115        BitswapStore::<S>::contains(self, cid)
116    }
117}