use crate::p2p::{MultiaddrWithPeerId, MultiaddrWithoutPeerId};
use crate::subscription::{SubscriptionFuture, SubscriptionRegistry};
use core::task::{Context, Poll};
use libp2p::core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId};
use libp2p::swarm::protocols_handler::{
DummyProtocolsHandler, IntoProtocolsHandler, ProtocolsHandler,
};
use libp2p::swarm::{self, NetworkBehaviour, PollParameters, Swarm};
use std::collections::{HashMap, HashSet, VecDeque};
use std::convert::{TryFrom, TryInto};
use std::time::Duration;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Connection {
pub addr: MultiaddrWithPeerId,
pub rtt: Option<Duration>,
}
pub struct Disconnector {
peer_id: PeerId,
}
impl Disconnector {
pub fn disconnect<T: NetworkBehaviour>(self, swarm: &mut Swarm<T>)
where <<<T as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent: std::clone::Clone
{
Swarm::ban_peer_id(swarm, self.peer_id.clone());
Swarm::unban_peer_id(swarm, self.peer_id);
}
}
type NetworkBehaviourAction = swarm::NetworkBehaviourAction<<<<SwarmApi as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, <SwarmApi as NetworkBehaviour>::OutEvent>;
#[derive(Debug, Default)]
pub struct SwarmApi {
events: VecDeque<NetworkBehaviourAction>,
peers: HashSet<PeerId>,
connect_registry: SubscriptionRegistry<(), String>,
connections: HashMap<MultiaddrWithoutPeerId, PeerId>,
roundtrip_times: HashMap<PeerId, Duration>,
connected_peers: HashMap<PeerId, Vec<MultiaddrWithoutPeerId>>,
pub(crate) bootstrappers: HashSet<MultiaddrWithPeerId>,
}
impl SwarmApi {
pub fn add_peer(&mut self, peer_id: PeerId) {
self.peers.insert(peer_id);
}
pub fn peers(&self) -> impl Iterator<Item = &PeerId> {
self.peers.iter()
}
pub fn remove_peer(&mut self, peer_id: &PeerId) {
self.peers.remove(peer_id);
}
pub fn connections(&self) -> impl Iterator<Item = Connection> + '_ {
self.connected_peers
.iter()
.filter_map(move |(peer, conns)| {
let rtt = self.roundtrip_times.get(peer).cloned();
if let Some(any) = conns.first() {
Some(Connection {
addr: MultiaddrWithPeerId::from((any.clone(), peer.clone())),
rtt,
})
} else {
None
}
})
}
pub fn set_rtt(&mut self, peer_id: &PeerId, rtt: Duration) {
self.roundtrip_times.insert(peer_id.clone(), rtt);
}
pub fn connect(&mut self, addr: MultiaddrWithPeerId) -> Option<SubscriptionFuture<(), String>> {
if self.connections.contains_key(&addr.multiaddr) {
return None;
}
trace!("Connecting to {:?}", addr);
let subscription = self
.connect_registry
.create_subscription(addr.clone().into(), None);
let MultiaddrWithPeerId { multiaddr, .. } = addr;
self.events.push_back(NetworkBehaviourAction::DialAddress {
address: multiaddr.into(),
});
Some(subscription)
}
pub fn disconnect(&mut self, addr: MultiaddrWithPeerId) -> Option<Disconnector> {
trace!("disconnect {}", addr);
if let Some(peer_id) = self.connections.remove(&addr.multiaddr) {
self.mark_disconnected(&peer_id);
Some(Disconnector { peer_id })
} else {
None
}
}
fn mark_disconnected(&mut self, peer_id: &PeerId) {
for address in self.connected_peers.remove(peer_id).into_iter().flatten() {
self.connections.remove(&address);
}
self.roundtrip_times.remove(peer_id);
}
}
impl NetworkBehaviour for SwarmApi {
type ProtocolsHandler = DummyProtocolsHandler;
type OutEvent = void::Void;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
trace!("new_handler");
Default::default()
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
trace!("addresses_of_peer {}", peer_id);
self.connected_peers
.get(peer_id)
.cloned()
.map(|addrs| addrs.into_iter().map(From::from).collect())
.unwrap_or_default()
}
fn inject_connection_established(
&mut self,
peer_id: &PeerId,
_id: &ConnectionId,
cp: &ConnectedPoint,
) {
trace!("inject_connected {} {:?}", peer_id, cp);
let addr: MultiaddrWithoutPeerId = connection_point_addr(cp).to_owned().try_into().unwrap();
self.peers.insert(peer_id.clone());
let connections = self.connected_peers.entry(peer_id.clone()).or_default();
connections.push(addr.clone());
self.connections.insert(addr.clone(), peer_id.clone());
if let ConnectedPoint::Dialer { .. } = cp {
let addr = MultiaddrWithPeerId {
multiaddr: addr,
peer_id: peer_id.clone(),
};
self.connect_registry
.finish_subscription(addr.into(), Ok(()));
}
}
fn inject_connected(&mut self, _peer_id: &PeerId) {
}
fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
_id: &ConnectionId,
cp: &ConnectedPoint,
) {
trace!("inject_connection_closed {} {:?}", peer_id, cp);
let closed_addr = connection_point_addr(cp).to_owned().try_into().unwrap();
let became_empty = if let Some(connections) = self.connected_peers.get_mut(peer_id) {
if let Some(index) = connections.iter().position(|addr| *addr == closed_addr) {
connections.swap_remove(index);
}
connections.is_empty()
} else {
false
};
if became_empty {
self.connected_peers.remove(peer_id);
}
self.connections.remove(&closed_addr);
if let ConnectedPoint::Dialer { .. } = cp {
let addr = MultiaddrWithPeerId::from((closed_addr, peer_id.to_owned()));
self.connect_registry
.finish_subscription(addr.into(), Err("Connection reset by peer".to_owned()));
}
}
fn inject_disconnected(&mut self, peer_id: &PeerId) {
trace!("inject_disconnected: {}", peer_id);
self.mark_disconnected(peer_id);
}
fn inject_event(&mut self, _peer_id: PeerId, _connection: ConnectionId, _event: void::Void) {}
fn inject_addr_reach_failure(
&mut self,
peer_id: Option<&PeerId>,
addr: &Multiaddr,
error: &dyn std::error::Error,
) {
trace!("inject_addr_reach_failure {} {}", addr, error);
if let Some(peer_id) = peer_id {
let addr: MultiaddrWithPeerId = if let Ok(addr) = addr.to_owned().try_into() {
addr
} else {
(
MultiaddrWithoutPeerId::try_from(addr.to_owned()).unwrap(),
peer_id.to_owned(),
)
.into()
};
self.connect_registry
.finish_subscription(addr.into(), Err(error.to_string()));
}
}
fn poll(
&mut self,
_: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction> {
if let Some(event) = self.events.pop_front() {
Poll::Ready(event)
} else {
Poll::Pending
}
}
}
fn connection_point_addr(cp: &ConnectedPoint) -> &Multiaddr {
match cp {
ConnectedPoint::Dialer { address } => address,
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::p2p::transport::{build_transport, TTransport};
use libp2p::identity::Keypair;
use libp2p::{multiaddr::Protocol, multihash::Multihash, swarm::Swarm};
use std::convert::TryInto;
#[tokio::test(max_threads = 1)]
async fn swarm_api() {
let (peer1_id, trans) = mk_transport();
let mut swarm1 = Swarm::new(trans, SwarmApi::default(), peer1_id.clone());
let (peer2_id, trans) = mk_transport();
let mut swarm2 = Swarm::new(trans, SwarmApi::default(), peer2_id);
Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
for l in Swarm::listeners(&swarm1) {
let mut addr = l.to_owned();
addr.push(Protocol::P2p(
Multihash::from_bytes(peer1_id.clone().into_bytes()).unwrap(),
));
if let Some(fut) = swarm2.connect(addr.try_into().unwrap()) {
fut.await.unwrap();
}
}
}
fn mk_transport() -> (PeerId, TTransport) {
let key = Keypair::generate_ed25519();
let peer_id = key.public().into_peer_id();
let transport = build_transport(key).unwrap();
(peer_id, transport)
}
}