ipfs-embed 0.19.0

small embeddable ipfs implementation
Documentation
use anyhow::Result;
use fnv::FnvHashMap;
use futures::channel::mpsc;
use futures::stream::Stream;
use lazy_static::lazy_static;
use libp2p::core::connection::{ConnectedPoint, ConnectionId};
use libp2p::identify::IdentifyInfo;
use libp2p::swarm::protocols_handler::DummyProtocolsHandler;
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p::{Multiaddr, PeerId};
use prometheus::{IntCounter, IntGauge, Registry};
use std::task::{Context, Poll};
use std::time::Duration;

#[derive(Clone, Debug, Eq, PartialEq)]
pub enum Event {
    NewListenAddr(Multiaddr),
    ExpiredListenAddr(Multiaddr),
    NewExternalAddr(Multiaddr),
    Discovered(PeerId),
    Connected(PeerId),
    Disconnected(PeerId),
}

#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct PeerInfo {
    protocol_version: Option<String>,
    agent_version: Option<String>,
    protocols: Vec<String>,
    addresses: FnvHashMap<Multiaddr, AddressSource>,
    rtt: Option<Duration>,
}

impl PeerInfo {
    pub fn protocol_version(&self) -> Option<&str> {
        self.protocol_version.as_deref()
    }

    pub fn agent_version(&self) -> Option<&str> {
        self.agent_version.as_deref()
    }

    pub fn protocols(&self) -> impl Iterator<Item = &str> + '_ {
        self.protocols.iter().map(|s| &**s)
    }

    pub fn addresses(&self) -> impl Iterator<Item = (&Multiaddr, AddressSource)> + '_ {
        self.addresses.iter().map(|(addr, source)| (addr, *source))
    }

    pub fn rtt(&self) -> Option<Duration> {
        self.rtt
    }
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum AddressSource {
    Mdns,
    Kad,
    Peer,
    User,
}

lazy_static! {
    pub static ref CONNECTED: IntGauge =
        IntGauge::new("peers_connected", "Number of connected peers.").unwrap();
    pub static ref DISCOVERED: IntCounter =
        IntCounter::new("peers_discovered_total", "Number of discovered peers.").unwrap();
    pub static ref LISTENERS: IntGauge =
        IntGauge::new("peers_listeners", "Number of listeners.").unwrap();
    pub static ref EXTERNAL_ADDRS: IntCounter = IntCounter::new(
        "peers_external_addrs_total",
        "Number of external addresses.",
    )
    .unwrap();
}

#[derive(Debug)]
pub struct AddressBook {
    local_peer_id: PeerId,
    peers: FnvHashMap<PeerId, PeerInfo>,
    connections: FnvHashMap<PeerId, Multiaddr>,
    event_stream: Vec<mpsc::UnboundedSender<Event>>,
}

impl AddressBook {
    pub fn new(local_peer_id: PeerId) -> Self {
        Self {
            local_peer_id,
            peers: Default::default(),
            connections: Default::default(),
            event_stream: Default::default(),
        }
    }

    pub fn local_peer_id(&self) -> &PeerId {
        &self.local_peer_id
    }

    pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr, source: AddressSource) {
        if peer == self.local_peer_id() {
            return;
        }
        tracing::trace!(
            "adding address {} for peer {} from {:?}",
            address,
            peer,
            source
        );
        let info = self.peers.entry(*peer).or_default();
        info.addresses.insert(address, source);
        if !self.is_connected(peer) {
            self.notify(Event::Discovered(*peer));
        }
    }

    pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) {
        if let Some(info) = self.peers.get_mut(peer) {
            tracing::trace!("removing address {} for peer {}", address, peer);
            info.addresses.remove(address);
        }
    }

    pub fn peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
        self.peers.keys()
    }

    pub fn connections(&self) -> impl Iterator<Item = (&PeerId, &Multiaddr)> + '_ {
        self.connections.iter().map(|(peer, addr)| (peer, addr))
    }

    pub fn is_connected(&self, peer: &PeerId) -> bool {
        self.connections.contains_key(peer) || peer == self.local_peer_id()
    }

    pub fn info(&self, peer_id: &PeerId) -> Option<&PeerInfo> {
        self.peers.get(peer_id)
    }

    pub fn set_rtt(&mut self, peer_id: &PeerId, rtt: Option<Duration>) {
        if let Some(info) = self.peers.get_mut(peer_id) {
            info.rtt = rtt;
        }
    }

    pub fn set_info(&mut self, peer_id: &PeerId, identify: IdentifyInfo) {
        if let Some(info) = self.peers.get_mut(peer_id) {
            info.protocol_version = Some(identify.protocol_version);
            info.agent_version = Some(identify.agent_version);
            info.protocols = identify.protocols;
        }
    }

    pub fn event_stream(&mut self) -> impl Stream<Item = Event> {
        let (tx, rx) = mpsc::unbounded();
        self.event_stream.push(tx);
        rx
    }

    pub fn notify(&mut self, event: Event) {
        self.event_stream
            .retain(|tx| tx.unbounded_send(event.clone()).is_ok());
    }

    pub fn register_metrics(&self, registry: &Registry) -> Result<()> {
        registry.register(Box::new(CONNECTED.clone()))?;
        registry.register(Box::new(DISCOVERED.clone()))?;
        registry.register(Box::new(LISTENERS.clone()))?;
        registry.register(Box::new(EXTERNAL_ADDRS.clone()))?;
        Ok(())
    }
}

impl NetworkBehaviour for AddressBook {
    type ProtocolsHandler = DummyProtocolsHandler;
    type OutEvent = void::Void;

    fn new_handler(&mut self) -> Self::ProtocolsHandler {
        Default::default()
    }

    fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
        if let Some(info) = self.peers.get(peer_id) {
            info.addresses().map(|(addr, _)| addr.clone()).collect()
        } else {
            vec![]
        }
    }

    fn inject_connected(&mut self, peer_id: &PeerId) {
        tracing::trace!("connected to {}", peer_id);
        CONNECTED.inc();
        self.notify(Event::Connected(*peer_id));
    }

    fn inject_disconnected(&mut self, peer_id: &PeerId) {
        tracing::trace!("disconnected from {}", peer_id);
        CONNECTED.dec();
        self.notify(Event::Disconnected(*peer_id));
    }

    fn inject_event(&mut self, _peer_id: PeerId, _connection: ConnectionId, _event: void::Void) {}

    fn poll(
        &mut self,
        _cx: &mut Context,
        _params: &mut impl PollParameters,
    ) -> Poll<NetworkBehaviourAction<void::Void, void::Void>> {
        Poll::Pending
    }

    fn inject_connection_established(
        &mut self,
        peer_id: &PeerId,
        _: &ConnectionId,
        conn: &ConnectedPoint,
    ) {
        self.add_address(
            peer_id,
            conn.get_remote_address().clone(),
            AddressSource::Peer,
        );
        self.connections
            .insert(*peer_id, conn.get_remote_address().clone());
    }

    fn inject_address_change(
        &mut self,
        peer_id: &PeerId,
        _: &ConnectionId,
        _old: &ConnectedPoint,
        new: &ConnectedPoint,
    ) {
        self.add_address(
            peer_id,
            new.get_remote_address().clone(),
            AddressSource::Peer,
        );
        self.connections
            .insert(*peer_id, new.get_remote_address().clone());
    }

    fn inject_connection_closed(
        &mut self,
        peer_id: &PeerId,
        _: &ConnectionId,
        _conn: &ConnectedPoint,
    ) {
        self.connections.remove(&peer_id);
    }

    fn inject_addr_reach_failure(
        &mut self,
        peer_id: Option<&PeerId>,
        addr: &Multiaddr,
        error: &dyn std::error::Error,
    ) {
        if let Some(peer_id) = peer_id {
            tracing::trace!("inject_addr_reach_failure {}", error);
            self.remove_address(peer_id, addr);
        }
    }

    fn inject_dial_failure(&mut self, peer_id: &PeerId) {
        tracing::trace!("dial failure {}", peer_id);
        self.peers.remove(peer_id);
    }

    fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
        tracing::trace!("new listen addr {}", addr);
        LISTENERS.inc();
        self.notify(Event::NewListenAddr(addr.clone()));
    }

    fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
        tracing::trace!("expired listen addr {}", addr);
        LISTENERS.dec();
        self.notify(Event::ExpiredListenAddr(addr.clone()));
    }

    fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
        tracing::trace!("new external addr {}", addr);
        EXTERNAL_ADDRS.inc();
        self.notify(Event::NewExternalAddr(addr.clone()));
    }
}