1pub use anyhow::{Error, Result};
2pub use async_trait::async_trait;
3use fnv::FnvHashSet;
4use futures::channel::oneshot;
5use futures::stream::Stream;
6pub use libipld::block::Block;
7pub use libipld::cid::Cid;
8use libipld::codec::References;
9pub use libipld::error::BlockNotFound;
10use libipld::ipld::Ipld;
11pub use libipld::multihash::{MultihashDigest, U64};
12pub use libipld::store::{Store, StoreParams};
13pub use libp2p::swarm::AddressRecord;
14pub use libp2p::{Multiaddr, PeerId};
15pub use libp2p_bitswap::{BitswapStore, BitswapSync, Query, QueryResult, QueryType};
16use std::marker::PhantomData;
17use std::sync::Arc;
18
19#[derive(Clone, Debug)]
20pub enum NetworkEvent {
21 QueryComplete(Query, QueryResult),
22}
23
24pub trait Network<S: StoreParams>: Send + Sync + 'static {
25 type Subscription: Stream<Item = NetworkEvent> + Send + Unpin;
26 fn local_peer_id(&self) -> &PeerId;
27 fn listeners(&self, tx: oneshot::Sender<Vec<Multiaddr>>);
28 fn external_addresses(&self, tx: oneshot::Sender<Vec<AddressRecord>>);
29 fn get(&self, cid: Cid);
30 fn cancel_get(&self, cid: Cid);
31 fn sync(&self, cid: Cid, syncer: Arc<dyn BitswapSync>);
32 fn cancel_sync(&self, cid: Cid);
33 fn provide(&self, cid: Cid);
34 fn unprovide(&self, cid: Cid);
35 fn subscribe(&self) -> Self::Subscription;
36}
37
38#[derive(Clone, Debug, Eq, PartialEq)]
39pub enum StorageEvent {
40 Insert(Cid),
41 Remove(Cid),
42}
43
44#[async_trait]
45pub trait Storage<S: StoreParams>: Send + Sync + 'static {
46 type Subscription: Stream<Item = StorageEvent> + Send + Unpin;
47 fn contains(&self, cid: &Cid) -> Result<bool>;
48 fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>>;
49 fn insert(&self, block: &Block<S>) -> Result<()>;
50 async fn alias<T: AsRef<[u8]> + Send + Sync>(&self, alias: T, cid: Option<&Cid>) -> Result<()>;
51 fn resolve<T: AsRef<[u8]> + Send + Sync>(&self, alias: T) -> Result<Option<Cid>>;
52 async fn pinned(&self, cid: &Cid) -> Result<Option<bool>>;
53 fn subscribe(&self) -> Self::Subscription;
54}
55
56pub struct BitswapStorage<S: StoreParams, T: Storage<S>> {
57 marker: PhantomData<S>,
58 store: Arc<T>,
59}
60
61impl<S: StoreParams, T: Storage<S>> BitswapStorage<S, T> {
62 pub fn new(store: Arc<T>) -> Self {
63 Self {
64 marker: PhantomData,
65 store,
66 }
67 }
68}
69
70impl<S: StoreParams, T: Storage<S>> BitswapStore<S> for BitswapStorage<S, T> {
71 fn contains(&self, cid: &Cid) -> bool {
72 match self.store.contains(cid) {
73 Ok(b) => b,
74 Err(err) => {
75 log::error!("contains: {:?}", err);
76 false
77 }
78 }
79 }
80
81 fn get(&self, cid: &Cid) -> Option<Vec<u8>> {
82 match self.store.get(cid) {
83 Ok(data) => data,
84 Err(err) => {
85 log::error!("contains: {:?}", err);
86 None
87 }
88 }
89 }
90
91 fn insert(&self, cid: Cid, data: Vec<u8>) {
92 let block = Block::new_unchecked(cid, data);
93 if let Err(err) = self.store.insert(&block) {
94 log::error!("insert: {:?}", err);
95 }
96 }
97}
98
99impl<S: StoreParams, T: Storage<S>> BitswapSync for BitswapStorage<S, T>
100where
101 Ipld: References<S::Codecs>,
102{
103 fn references(&self, cid: &Cid) -> Box<dyn Iterator<Item = Cid>> {
104 if let Some(data) = self.get(cid) {
105 let block = Block::<S>::new_unchecked(*cid, data);
106 let mut refs = FnvHashSet::default();
107 if block.references(&mut refs).is_ok() {
108 return Box::new(refs.into_iter());
109 }
110 }
111 Box::new(std::iter::empty())
112 }
113
114 fn contains(&self, cid: &Cid) -> bool {
115 BitswapStore::<S>::contains(self, cid)
116 }
117}