1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
pub use anyhow::{Error, Result}; pub use async_trait::async_trait; use fnv::FnvHashSet; use futures::channel::oneshot; use futures::stream::Stream; pub use libipld::block::Block; pub use libipld::cid::Cid; use libipld::codec::References; pub use libipld::error::BlockNotFound; use libipld::ipld::Ipld; pub use libipld::multihash::{MultihashDigest, U64}; pub use libipld::store::{Store, StoreParams}; pub use libp2p::swarm::AddressRecord; pub use libp2p::{Multiaddr, PeerId}; pub use libp2p_bitswap::{BitswapStore, BitswapSync, Query, QueryResult, QueryType}; use std::marker::PhantomData; use std::sync::Arc; #[derive(Clone, Debug)] pub enum NetworkEvent { QueryComplete(Query, QueryResult), } pub trait Network<S: StoreParams>: Send + Sync + 'static { type Subscription: Stream<Item = NetworkEvent> + Send + Unpin; fn local_peer_id(&self) -> &PeerId; fn listeners(&self, tx: oneshot::Sender<Vec<Multiaddr>>); fn external_addresses(&self, tx: oneshot::Sender<Vec<AddressRecord>>); fn get(&self, cid: Cid); fn cancel_get(&self, cid: Cid); fn sync(&self, cid: Cid, syncer: Arc<dyn BitswapSync>); fn cancel_sync(&self, cid: Cid); fn provide(&self, cid: Cid); fn unprovide(&self, cid: Cid); fn subscribe(&self) -> Self::Subscription; } #[derive(Clone, Debug, Eq, PartialEq)] pub enum StorageEvent { Insert(Cid), Remove(Cid), } #[async_trait] pub trait Storage<S: StoreParams>: Send + Sync + 'static { type Subscription: Stream<Item = StorageEvent> + Send + Unpin; fn contains(&self, cid: &Cid) -> Result<bool>; fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>>; fn insert(&self, block: &Block<S>) -> Result<()>; async fn alias<T: AsRef<[u8]> + Send + Sync>(&self, alias: T, cid: Option<&Cid>) -> Result<()>; fn resolve<T: AsRef<[u8]> + Send + Sync>(&self, alias: T) -> Result<Option<Cid>>; async fn pinned(&self, cid: &Cid) -> Result<Option<bool>>; fn subscribe(&self) -> Self::Subscription; } pub struct BitswapStorage<S: StoreParams, T: Storage<S>> { marker: PhantomData<S>, store: Arc<T>, } impl<S: StoreParams, T: Storage<S>> BitswapStorage<S, T> { pub fn new(store: Arc<T>) -> Self { Self { marker: PhantomData, store, } } } impl<S: StoreParams, T: Storage<S>> BitswapStore<S> for BitswapStorage<S, T> { fn contains(&self, cid: &Cid) -> bool { match self.store.contains(cid) { Ok(b) => b, Err(err) => { log::error!("contains: {:?}", err); false } } } fn get(&self, cid: &Cid) -> Option<Vec<u8>> { match self.store.get(cid) { Ok(data) => data, Err(err) => { log::error!("contains: {:?}", err); None } } } fn insert(&self, cid: Cid, data: Vec<u8>) { let block = Block::new_unchecked(cid, data); if let Err(err) = self.store.insert(&block) { log::error!("insert: {:?}", err); } } } impl<S: StoreParams, T: Storage<S>> BitswapSync for BitswapStorage<S, T> where Ipld: References<S::Codecs>, { fn references(&self, cid: &Cid) -> Box<dyn Iterator<Item = Cid>> { if let Some(data) = self.get(cid) { let block = Block::<S>::new_unchecked(*cid, data); let mut refs = FnvHashSet::default(); if block.references(&mut refs).is_ok() { return Box::new(refs.into_iter()); } } Box::new(std::iter::empty()) } fn contains(&self, cid: &Cid) -> bool { BitswapStore::<S>::contains(self, cid) } }