ipfs-embed-net 0.11.0

small embeddable ipfs implementation
Documentation
use crate::behaviour::{GetChannel, NetworkBackendBehaviour, SyncChannel};
use futures::stream::Stream;
use futures::{future, pin_mut};
use libipld::store::StoreParams;
use libipld::{Cid, Result};
use libp2p::core::transport::upgrade::Version;
use libp2p::core::transport::Transport;
use libp2p::dns::DnsConfig;
use libp2p::mplex::MplexConfig;
use libp2p::noise::{Keypair, NoiseConfig, X25519Spec};
use libp2p::swarm::{AddressScore, Swarm, SwarmBuilder, SwarmEvent};
use libp2p::tcp::TcpConfig;
use prometheus::Registry;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::Duration;

mod behaviour;
mod config;

pub use crate::behaviour::{QueryId, SyncEvent};
pub use crate::config::NetworkConfig;
pub use libp2p::gossipsub::{GossipsubEvent, GossipsubMessage, MessageId, Topic, TopicHash};
pub use libp2p::kad::record::{Key, Record};
pub use libp2p::kad::{PeerRecord, Quorum};
pub use libp2p::swarm::AddressRecord;
pub use libp2p::{Multiaddr, PeerId};
pub use libp2p_bitswap::BitswapStore;

#[derive(Clone)]
pub struct NetworkService<P: StoreParams> {
    swarm: Arc<Mutex<Swarm<NetworkBackendBehaviour<P>>>>,
}

impl<P: StoreParams> NetworkService<P> {
    pub async fn new<S: BitswapStore<Params = P>>(config: NetworkConfig, store: S) -> Result<Self> {
        let dh_key = Keypair::<X25519Spec>::new()
            .into_authentic(&config.node_key)
            .unwrap();
        let transport = DnsConfig::new(
            TcpConfig::new()
                .nodelay(true)
                .upgrade(Version::V1)
                .authenticate(NoiseConfig::xx(dh_key).into_authenticated())
                .multiplex(MplexConfig::new())
                .timeout(Duration::from_secs(5))
                .boxed(),
        )?;

        let peer_id = config.peer_id();
        let behaviour = NetworkBackendBehaviour::<P>::new(config.clone(), store).await?;
        let swarm = SwarmBuilder::new(transport.boxed(), behaviour, peer_id)
            .executor(Box::new(|fut| {
                async_global_executor::spawn(fut).detach();
            }))
            .build();

        let swarm = Arc::new(Mutex::new(swarm));
        let swarm2 = swarm.clone();
        async_global_executor::spawn::<_, ()>(future::poll_fn(move |cx| {
            let mut guard = swarm.lock().unwrap();
            while let Poll::Ready(_) = {
                let swarm = &mut *guard;
                pin_mut!(swarm);
                swarm.poll_next(cx)
            } {}
            Poll::Pending
        }))
        .detach();

        Ok(Self { swarm: swarm2 })
    }

    pub fn local_peer_id(&self) -> PeerId {
        let swarm = self.swarm.lock().unwrap();
        *Swarm::local_peer_id(&swarm)
    }

    #[allow(clippy::await_holding_lock)]
    pub async fn listen_on(&self, addr: Multiaddr) -> Result<Multiaddr> {
        let mut swarm = self.swarm.lock().unwrap();
        Swarm::listen_on(&mut swarm, addr)?;
        loop {
            match swarm.next_event().await {
                SwarmEvent::NewListenAddr(addr) => {
                    tracing::info!("listening on {}", addr);
                    return Ok(addr);
                }
                SwarmEvent::ListenerClosed {
                    reason: Err(err), ..
                } => return Err(err.into()),
                _ => continue,
            }
        }
    }

    pub fn listeners(&self) -> Vec<Multiaddr> {
        let swarm = self.swarm.lock().unwrap();
        Swarm::listeners(&swarm).cloned().collect()
    }

    pub fn add_external_address(&self, addr: Multiaddr) {
        let mut swarm = self.swarm.lock().unwrap();
        Swarm::add_external_address(&mut swarm, addr, AddressScore::Infinite);
    }

    pub fn external_addresses(&self) -> Vec<AddressRecord> {
        let swarm = self.swarm.lock().unwrap();
        Swarm::external_addresses(&swarm).cloned().collect()
    }

    pub fn add_address(&self, peer: &PeerId, addr: Multiaddr) {
        let mut swarm = self.swarm.lock().unwrap();
        swarm.add_address(peer, addr);
    }

    pub fn remove_address(&self, peer: &PeerId, addr: &Multiaddr) {
        let mut swarm = self.swarm.lock().unwrap();
        swarm.remove_address(peer, addr);
    }

    pub fn dial(&self, peer: &PeerId) -> Result<()> {
        let mut swarm = self.swarm.lock().unwrap();
        Ok(Swarm::dial(&mut swarm, peer)?)
    }

    pub fn ban(&self, peer: PeerId) {
        let mut swarm = self.swarm.lock().unwrap();
        Swarm::ban_peer_id(&mut swarm, peer)
    }

    pub fn unban(&self, peer: PeerId) {
        let mut swarm = self.swarm.lock().unwrap();
        Swarm::unban_peer_id(&mut swarm, peer)
    }

    pub async fn bootstrap(&self, peers: &[(PeerId, Multiaddr)]) -> Result<()> {
        for (peer, addr) in peers {
            self.add_address(peer, addr.clone());
            self.dial(peer)?;
        }
        let rx = {
            let mut swarm = self.swarm.lock().unwrap();
            swarm.bootstrap()
        };
        tracing::trace!("started bootstrap");
        rx.await??;
        tracing::trace!("boostrap complete");
        Ok(())
    }

    pub async fn get_record(&self, key: &Key, quorum: Quorum) -> Result<Vec<PeerRecord>> {
        let rx = {
            let mut swarm = self.swarm.lock().unwrap();
            swarm.get_record(key, quorum)
        };
        Ok(rx.await??)
    }

    pub async fn put_record(&self, record: Record, quorum: Quorum) -> Result<()> {
        let rx = {
            let mut swarm = self.swarm.lock().unwrap();
            swarm.put_record(record, quorum)
        };
        rx.await??;
        Ok(())
    }

    pub fn subscribe(&self, topic: &str) -> Result<impl Stream<Item = Vec<u8>>> {
        let mut swarm = self.swarm.lock().unwrap();
        swarm.subscribe(topic)
    }

    pub fn publish(&self, topic: &str, msg: Vec<u8>) -> Result<()> {
        let mut swarm = self.swarm.lock().unwrap();
        swarm.publish(topic, msg)
    }

    pub fn remove_record(&self, key: &Key) {
        let mut swarm = self.swarm.lock().unwrap();
        swarm.remove_record(key)
    }

    pub fn get(&self, cid: Cid) -> GetQuery<P> {
        let mut swarm = self.swarm.lock().unwrap();
        let (rx, id) = swarm.get(cid);
        GetQuery {
            swarm: Some(self.swarm.clone()),
            id,
            rx,
        }
    }

    pub fn sync(&self, cid: Cid, missing: impl Iterator<Item = Cid>) -> SyncQuery<P> {
        let mut swarm = self.swarm.lock().unwrap();
        let (rx, id) = swarm.sync(cid, missing);
        SyncQuery {
            swarm: Some(self.swarm.clone()),
            id,
            rx,
        }
    }

    pub async fn provide(&self, cid: Cid) -> Result<()> {
        let rx = {
            let mut swarm = self.swarm.lock().unwrap();
            swarm.provide(cid)
        };
        rx.await??;
        Ok(())
    }

    pub fn unprovide(&self, cid: Cid) {
        let mut swarm = self.swarm.lock().unwrap();
        swarm.unprovide(cid)
    }

    pub fn register_metrics(&self, registry: &Registry) -> Result<()> {
        let swarm = self.swarm.lock().unwrap();
        swarm.register_metrics(registry)
    }
}

pub struct GetQuery<P: StoreParams> {
    swarm: Option<Arc<Mutex<Swarm<NetworkBackendBehaviour<P>>>>>,
    id: QueryId,
    rx: GetChannel,
}

impl<P: StoreParams> Future for GetQuery<P> {
    type Output = Result<()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        match Pin::new(&mut self.rx).poll(cx) {
            Poll::Ready(Ok(result)) => Poll::Ready(result),
            Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
            Poll::Pending => Poll::Pending,
        }
    }
}

impl<P: StoreParams> Drop for GetQuery<P> {
    fn drop(&mut self) {
        let swarm = self.swarm.take().unwrap();
        let mut swarm = swarm.lock().unwrap();
        swarm.cancel(self.id);
    }
}

/// A `bitswap` sync query.
pub struct SyncQuery<P: StoreParams> {
    swarm: Option<Arc<Mutex<Swarm<NetworkBackendBehaviour<P>>>>>,
    id: QueryId,
    rx: SyncChannel,
}

impl<P: StoreParams> Future for SyncQuery<P> {
    type Output = Result<()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        loop {
            match Pin::new(&mut self.rx).poll_next(cx) {
                Poll::Ready(Some(SyncEvent::Complete(result))) => return Poll::Ready(result),
                Poll::Ready(_) => continue,
                Poll::Pending => return Poll::Pending,
            }
        }
    }
}

impl<P: StoreParams> Stream for SyncQuery<P> {
    type Item = SyncEvent;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        Pin::new(&mut self.rx).poll_next(cx)
    }
}

impl<P: StoreParams> Drop for SyncQuery<P> {
    fn drop(&mut self) {
        let swarm = self.swarm.take().unwrap();
        let mut swarm = swarm.lock().unwrap();
        swarm.cancel(self.id);
    }
}