use crate::muxing::StreamMuxer;
use crate::{
Endpoint, Multiaddr, PeerId,
nodes::{
collection::{
CollectionEvent,
CollectionNodeAccept,
CollectionReachEvent,
CollectionStream,
PeerMut as CollecPeerMut,
ReachAttemptId
},
handled_node::{
HandledNodeError,
NodeHandler
},
node::Substream
},
nodes::listeners::{ListenersEvent, ListenersStream},
transport::Transport
};
use fnv::FnvHashMap;
use futures::{prelude::*, future};
use std::{
collections::hash_map::{Entry, OccupiedEntry},
error,
fmt,
io::{Error as IoError, ErrorKind as IoErrorKind}
};
#[derive(Debug)]
pub struct RawSwarm<TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
where
TTrans: Transport,
{
listeners: ListenersStream<TTrans>,
active_nodes: CollectionStream<TInEvent, TOutEvent, THandler, RawSwarmReachError, THandlerErr>,
reach_attempts: ReachAttempts,
}
#[derive(Debug)]
struct ReachAttempts {
local_peer_id: PeerId,
out_reach_attempts: FnvHashMap<PeerId, OutReachAttempt>,
other_reach_attempts: Vec<(ReachAttemptId, ConnectedPoint)>,
connected_points: FnvHashMap<PeerId, ConnectedPoint>,
}
#[derive(Debug, Clone)]
struct OutReachAttempt {
id: ReachAttemptId,
cur_attempted: Multiaddr,
next_attempts: Vec<Multiaddr>,
}
pub enum RawSwarmEvent<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a>
where
TTrans: Transport,
{
ListenerClosed {
listen_addr: Multiaddr,
listener: TTrans::Listener,
result: Result<(), <TTrans::Listener as Stream>::Error>,
},
IncomingConnection(IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>),
IncomingConnectionError {
listen_addr: Multiaddr,
send_back_addr: Multiaddr,
error: IoError,
},
Connected {
peer_id: PeerId,
endpoint: ConnectedPoint,
},
Replaced {
peer_id: PeerId,
closed_endpoint: ConnectedPoint,
endpoint: ConnectedPoint,
},
NodeClosed {
peer_id: PeerId,
endpoint: ConnectedPoint,
},
NodeError {
peer_id: PeerId,
endpoint: ConnectedPoint,
error: HandledNodeError<THandlerErr>,
},
DialError {
remain_addrs_attempt: usize,
peer_id: PeerId,
multiaddr: Multiaddr,
error: RawSwarmReachError,
},
UnknownPeerDialError {
multiaddr: Multiaddr,
error: IoError,
handler: THandler,
},
NodeEvent {
peer_id: PeerId,
event: TOutEvent,
},
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> fmt::Debug for RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
where
TOutEvent: fmt::Debug,
TTrans: Transport,
THandlerErr: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
RawSwarmEvent::ListenerClosed { ref listen_addr, listener: _, ref result } => {
f.debug_struct("ListenerClosed")
.field("listen_addr", listen_addr)
.field("result", result)
.finish()
}
RawSwarmEvent::IncomingConnection( IncomingConnectionEvent { ref listen_addr, ref send_back_addr, .. } ) => {
f.debug_struct("IncomingConnection")
.field("listen_addr", listen_addr)
.field("send_back_addr", send_back_addr)
.finish()
}
RawSwarmEvent::IncomingConnectionError { ref listen_addr, ref send_back_addr, ref error} => {
f.debug_struct("IncomingConnectionError")
.field("listen_addr", listen_addr)
.field("send_back_addr", send_back_addr)
.field("error", error)
.finish()
}
RawSwarmEvent::Connected { ref peer_id, ref endpoint } => {
f.debug_struct("Connected")
.field("peer_id", peer_id)
.field("endpoint", endpoint)
.finish()
}
RawSwarmEvent::Replaced { ref peer_id, ref closed_endpoint, ref endpoint } => {
f.debug_struct("Replaced")
.field("peer_id", peer_id)
.field("closed_endpoint", closed_endpoint)
.field("endpoint", endpoint)
.finish()
}
RawSwarmEvent::NodeClosed { ref peer_id, ref endpoint } => {
f.debug_struct("NodeClosed")
.field("peer_id", peer_id)
.field("endpoint", endpoint)
.finish()
}
RawSwarmEvent::NodeError { ref peer_id, ref endpoint, ref error } => {
f.debug_struct("NodeError")
.field("peer_id", peer_id)
.field("endpoint", endpoint)
.field("error", error)
.finish()
}
RawSwarmEvent::DialError { ref remain_addrs_attempt, ref peer_id, ref multiaddr, ref error } => {
f.debug_struct("DialError")
.field("remain_addrs_attempt", remain_addrs_attempt)
.field("peer_id", peer_id)
.field("multiaddr", multiaddr)
.field("error", error)
.finish()
}
RawSwarmEvent::UnknownPeerDialError { ref multiaddr, ref error, .. } => {
f.debug_struct("UnknownPeerDialError")
.field("multiaddr", multiaddr)
.field("error", error)
.finish()
}
RawSwarmEvent::NodeEvent { ref peer_id, ref event } => {
f.debug_struct("NodeEvent")
.field("peer_id", peer_id)
.field("event", event)
.finish()
}
}
}
}
#[derive(Debug)]
pub enum RawSwarmReachError {
Transport(IoError),
PeerIdMismatch {
obtained: PeerId,
}
}
impl fmt::Display for RawSwarmReachError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
RawSwarmReachError::Transport(err) => write!(f, "{}", err),
RawSwarmReachError::PeerIdMismatch { obtained } => {
write!(f, "Peer ID mismatch, obtained: {}", obtained.to_base58())
},
}
}
}
impl error::Error for RawSwarmReachError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
RawSwarmReachError::Transport(err) => Some(err),
RawSwarmReachError::PeerIdMismatch { .. } => None,
}
}
}
pub struct IncomingConnectionEvent<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a>
where TTrans: Transport
{
upgrade: TTrans::ListenerUpgrade,
listen_addr: Multiaddr,
send_back_addr: Multiaddr,
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, RawSwarmReachError, THandlerErr>,
other_reach_attempts: &'a mut Vec<(ReachAttemptId, ConnectedPoint)>,
}
impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr>
IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::ListenerUpgrade: Send + 'static,
THandler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
THandler::OutboundOpenInfo: Send + 'static, THandlerErr: error::Error + Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
#[inline]
pub fn accept(self, handler: THandler) {
self.accept_with_builder(|_| handler)
}
pub fn accept_with_builder<TBuilder>(self, builder: TBuilder)
where TBuilder: FnOnce(&ConnectedPoint) -> THandler
{
let connected_point = self.to_connected_point();
let handler = builder(&connected_point);
let id = self.active_nodes.add_reach_attempt(self.upgrade.map_err(RawSwarmReachError::Transport), handler);
self.other_reach_attempts.push((
id,
connected_point,
));
}
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
where TTrans: Transport
{
#[inline]
pub fn listen_addr(&self) -> &Multiaddr {
&self.listen_addr
}
#[inline]
pub fn send_back_addr(&self) -> &Multiaddr {
&self.send_back_addr
}
#[inline]
pub fn to_connected_point(&self) -> ConnectedPoint {
ConnectedPoint::Listener {
listen_addr: self.listen_addr.clone(),
send_back_addr: self.send_back_addr.clone(),
}
}
}
#[derive(Debug, Clone)]
pub enum ConnectedPoint {
Dialer {
address: Multiaddr,
},
Listener {
listen_addr: Multiaddr,
send_back_addr: Multiaddr,
},
}
impl<'a> From<&'a ConnectedPoint> for Endpoint {
#[inline]
fn from(endpoint: &'a ConnectedPoint) -> Endpoint {
endpoint.to_endpoint()
}
}
impl From<ConnectedPoint> for Endpoint {
#[inline]
fn from(endpoint: ConnectedPoint) -> Endpoint {
endpoint.to_endpoint()
}
}
impl ConnectedPoint {
#[inline]
pub fn to_endpoint(&self) -> Endpoint {
match *self {
ConnectedPoint::Dialer { .. } => Endpoint::Dialer,
ConnectedPoint::Listener { .. } => Endpoint::Listener,
}
}
#[inline]
pub fn is_dialer(&self) -> bool {
match *self {
ConnectedPoint::Dialer { .. } => true,
ConnectedPoint::Listener { .. } => false,
}
}
#[inline]
pub fn is_listener(&self) -> bool {
match *self {
ConnectedPoint::Dialer { .. } => false,
ConnectedPoint::Listener { .. } => true,
}
}
}
impl<TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr>
RawSwarm<TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
where
TTrans: Transport + Clone,
TMuxer: StreamMuxer,
THandler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
THandler::OutboundOpenInfo: Send + 'static, THandlerErr: error::Error + Send + 'static,
{
#[inline]
pub fn new(transport: TTrans, local_peer_id: PeerId) -> Self {
RawSwarm {
listeners: ListenersStream::new(transport),
active_nodes: CollectionStream::new(),
reach_attempts: ReachAttempts {
local_peer_id,
out_reach_attempts: Default::default(),
other_reach_attempts: Vec::new(),
connected_points: Default::default(),
},
}
}
#[inline]
pub fn transport(&self) -> &TTrans {
self.listeners.transport()
}
#[inline]
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
self.listeners.listen_on(addr)
}
#[inline]
pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
self.listeners.listeners()
}
#[inline]
pub fn nat_traversal<'a>(
&'a self,
observed_addr: &'a Multiaddr,
) -> impl Iterator<Item = Multiaddr> + 'a
where TMuxer: 'a,
THandler: 'a,
{
self.listeners()
.flat_map(move |server| self.transport().nat_traversal(server, observed_addr))
}
#[inline]
pub fn local_peer_id(&self) -> &PeerId {
&self.reach_attempts.local_peer_id
}
pub fn dial(&mut self, addr: Multiaddr, handler: THandler) -> Result<(), Multiaddr>
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Dial: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
let future = match self.transport().clone().dial(addr.clone()) {
Ok(fut) => fut,
Err((_, addr)) => return Err(addr),
};
let connected_point = ConnectedPoint::Dialer { address: addr };
let reach_id = self.active_nodes.add_reach_attempt(future.map_err(RawSwarmReachError::Transport), handler);
self.reach_attempts.other_reach_attempts.push((reach_id, connected_point));
Ok(())
}
#[inline]
pub fn num_incoming_negotiated(&self) -> usize {
self.reach_attempts.other_reach_attempts
.iter()
.filter(|&(_, endpoint)| endpoint.is_listener())
.count()
}
#[inline]
pub fn broadcast_event(&mut self, event: &TInEvent)
where TInEvent: Clone,
{
self.active_nodes.broadcast_event(event)
}
#[inline]
pub fn peer(&mut self, peer_id: PeerId) -> Peer<TTrans, TInEvent, TOutEvent, THandler, THandlerErr> {
if self.active_nodes.peer_mut(&peer_id).is_some() {
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
return Peer::Connected(PeerConnected {
peer: self
.active_nodes
.peer_mut(&peer_id)
.expect("we checked for Some just above"),
peer_id,
connected_points: &mut self.reach_attempts.connected_points,
});
}
if self.reach_attempts.out_reach_attempts.get_mut(&peer_id).is_some() {
debug_assert!(!self.reach_attempts.connected_points.contains_key(&peer_id));
return Peer::PendingConnect(PeerPendingConnect {
attempt: match self.reach_attempts.out_reach_attempts.entry(peer_id.clone()) {
Entry::Occupied(e) => e,
Entry::Vacant(_) => panic!("we checked for Some just above"),
},
active_nodes: &mut self.active_nodes,
});
}
debug_assert!(!self.reach_attempts.connected_points.contains_key(&peer_id));
Peer::NotConnected(PeerNotConnected {
nodes: self,
peer_id,
})
}
fn start_dial_out(&mut self, peer_id: PeerId, handler: THandler, first: Multiaddr, rest: Vec<Multiaddr>)
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Dial: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
let reach_id = match self.transport().clone().dial(first.clone()) {
Ok(fut) => {
let expected_peer_id = peer_id.clone();
let fut = fut
.map_err(RawSwarmReachError::Transport)
.and_then(move |(actual_peer_id, muxer)| {
if actual_peer_id == expected_peer_id {
Ok((actual_peer_id, muxer))
} else {
Err(RawSwarmReachError::PeerIdMismatch { obtained: actual_peer_id })
}
});
self.active_nodes.add_reach_attempt(fut, handler)
},
Err((_, addr)) => {
let msg = format!("unsupported multiaddr {}", addr);
let fut = future::err(RawSwarmReachError::Transport(IoError::new(IoErrorKind::Other, msg)));
self.active_nodes.add_reach_attempt(fut, handler)
},
};
let former = self.reach_attempts.out_reach_attempts.insert(
peer_id,
OutReachAttempt {
id: reach_id,
cur_attempted: first,
next_attempts: rest,
},
);
debug_assert!(former.is_none());
}
pub fn poll(&mut self) -> Async<RawSwarmEvent<TTrans, TInEvent, TOutEvent, THandler, THandlerErr>>
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Dial: Send + 'static,
TTrans::ListenerUpgrade: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
THandler::OutboundOpenInfo: Send + 'static, THandlerErr: error::Error + Send + 'static,
{
match self.listeners.poll() {
Async::NotReady => (),
Async::Ready(ListenersEvent::Incoming { upgrade, listen_addr, send_back_addr }) => {
let event = IncomingConnectionEvent {
upgrade,
listen_addr,
send_back_addr,
active_nodes: &mut self.active_nodes,
other_reach_attempts: &mut self.reach_attempts.other_reach_attempts,
};
return Async::Ready(RawSwarmEvent::IncomingConnection(event));
}
Async::Ready(ListenersEvent::Closed { listen_addr, listener, result }) => {
return Async::Ready(RawSwarmEvent::ListenerClosed {
listen_addr,
listener,
result,
});
}
}
let (action, out_event);
match self.active_nodes.poll() {
Async::NotReady => return Async::NotReady,
Async::Ready(CollectionEvent::NodeReached(reach_event)) => {
let (a, e) = handle_node_reached(&mut self.reach_attempts, reach_event);
action = a;
out_event = e;
}
Async::Ready(CollectionEvent::ReachError { id, error, handler }) => {
let (a, e) = handle_reach_error(&mut self.reach_attempts, id, error, handler);
action = a;
out_event = e;
}
Async::Ready(CollectionEvent::NodeError {
peer_id,
error,
}) => {
let endpoint = self.reach_attempts.connected_points.remove(&peer_id)
.expect("We insert into connected_points whenever a connection is \
opened and remove only when a connection is closed; the \
underlying API is guaranteed to always deliver a connection \
closed message after it has been opened, and no two closed \
messages; QED");
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
action = Default::default();
out_event = RawSwarmEvent::NodeError {
peer_id,
endpoint,
error,
};
}
Async::Ready(CollectionEvent::NodeClosed { peer_id }) => {
let endpoint = self.reach_attempts.connected_points.remove(&peer_id)
.expect("We insert into connected_points whenever a connection is \
opened and remove only when a connection is closed; the \
underlying API is guaranteed to always deliver a connection \
closed message after it has been opened, and no two closed \
messages; QED");
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
action = Default::default();
out_event = RawSwarmEvent::NodeClosed { peer_id, endpoint };
}
Async::Ready(CollectionEvent::NodeEvent { peer_id, event }) => {
action = Default::default();
out_event = RawSwarmEvent::NodeEvent { peer_id, event };
}
}
if let Some((peer_id, handler, first, rest)) = action.start_dial_out {
self.start_dial_out(peer_id, handler, first, rest);
}
if let Some(interrupt) = action.interrupt {
self.active_nodes
.interrupt(interrupt)
.expect("interrupt is guaranteed to be gathered from `out_reach_attempts`;
we insert in out_reach_attempts only when we call \
active_nodes.add_reach_attempt, and we remove only when we call \
interrupt or when a reach attempt succeeds or errors; therefore the \
out_reach_attempts should always be in sync with the actual \
attempts; QED");
}
Async::Ready(out_event)
}
}
#[derive(Debug)]
#[must_use]
struct ActionItem<THandler> {
start_dial_out: Option<(PeerId, THandler, Multiaddr, Vec<Multiaddr>)>,
interrupt: Option<ReachAttemptId>,
}
impl<THandler> Default for ActionItem<THandler> {
fn default() -> Self {
ActionItem {
start_dial_out: None,
interrupt: None,
}
}
}
fn handle_node_reached<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr>(
reach_attempts: &mut ReachAttempts,
event: CollectionReachEvent<TInEvent, TOutEvent, THandler, RawSwarmReachError, THandlerErr>
) -> (ActionItem<THandler>, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>)
where
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
if let Some(in_pos) = reach_attempts
.other_reach_attempts
.iter()
.position(|i| i.0 == event.reach_attempt_id())
{
let (_, opened_endpoint) = reach_attempts.other_reach_attempts.swap_remove(in_pos);
if event.would_replace() && has_dial_prio(&reach_attempts.local_peer_id, event.peer_id()) {
if let Some(ConnectedPoint::Dialer { .. }) = reach_attempts.connected_points.get(event.peer_id()) {
if let ConnectedPoint::Listener { listen_addr, send_back_addr } = opened_endpoint {
return (Default::default(), RawSwarmEvent::IncomingConnectionError {
listen_addr,
send_back_addr,
error: IoError::new(IoErrorKind::PermissionDenied,
"refused incoming connection".to_string()),
});
}
}
}
let closed_endpoint = reach_attempts.connected_points.insert(event.peer_id().clone(), opened_endpoint.clone());
let action = if let Some(attempt) = reach_attempts.out_reach_attempts.remove(&event.peer_id()) {
debug_assert_ne!(attempt.id, event.reach_attempt_id());
ActionItem {
interrupt: Some(attempt.id),
.. Default::default()
}
} else {
ActionItem::default()
};
let (outcome, peer_id) = event.accept();
if outcome == CollectionNodeAccept::ReplacedExisting {
let closed_endpoint = closed_endpoint
.expect("We insert into connected_points whenever a connection is opened and \
remove only when a connection is closed; the underlying API is \
guaranteed to always deliver a connection closed message after it has \
been opened, and no two closed messages; QED");
return (action, RawSwarmEvent::Replaced {
peer_id,
endpoint: opened_endpoint,
closed_endpoint,
});
} else {
return (action, RawSwarmEvent::Connected { peer_id, endpoint: opened_endpoint });
}
}
let is_outgoing_and_ok = if let Some(attempt) = reach_attempts.out_reach_attempts.get(event.peer_id()) {
attempt.id == event.reach_attempt_id()
} else {
false
};
if is_outgoing_and_ok {
let attempt = reach_attempts.out_reach_attempts.remove(event.peer_id())
.expect("is_outgoing_and_ok is true only if reach_attempts.out_reach_attempts.get(event.peer_id()) \
returned Some");
let opened_endpoint = ConnectedPoint::Dialer {
address: attempt.cur_attempted,
};
let closed_endpoint = reach_attempts.connected_points
.insert(event.peer_id().clone(), opened_endpoint.clone());
let (outcome, peer_id) = event.accept();
if outcome == CollectionNodeAccept::ReplacedExisting {
let closed_endpoint = closed_endpoint
.expect("We insert into connected_points whenever a connection is opened and \
remove only when a connection is closed; the underlying API is guaranteed \
to always deliver a connection closed message after it has been opened, \
and no two closed messages; QED");
return (Default::default(), RawSwarmEvent::Replaced {
peer_id,
endpoint: opened_endpoint,
closed_endpoint,
});
} else {
return (Default::default(), RawSwarmEvent::Connected { peer_id, endpoint: opened_endpoint });
}
}
panic!("The API of collection guarantees that the id sent back in NodeReached (which is where \
we call handle_node_reached) is one that was passed to add_reach_attempt. Whenever we \
call add_reach_attempt, we also insert at the same time an entry either in \
out_reach_attempts or in other_reach_attempts. It is therefore guaranteed that we \
find back this ID in either of these two sets");
}
#[inline]
fn has_dial_prio(local: &PeerId, other: &PeerId) -> bool {
local.as_bytes() < other.as_bytes()
}
fn handle_reach_error<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>(
reach_attempts: &mut ReachAttempts,
reach_id: ReachAttemptId,
error: RawSwarmReachError,
handler: THandler,
) -> (ActionItem<THandler>, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>)
where TTrans: Transport
{
let out_reach_peer_id = reach_attempts
.out_reach_attempts
.iter()
.find(|(_, a)| a.id == reach_id)
.map(|(p, _)| p.clone());
if let Some(peer_id) = out_reach_peer_id {
let mut attempt = reach_attempts.out_reach_attempts.remove(&peer_id)
.expect("out_reach_peer_id is a key that is grabbed from out_reach_attempts");
let num_remain = attempt.next_attempts.len();
let failed_addr = attempt.cur_attempted.clone();
let action = if !attempt.next_attempts.is_empty() {
let mut attempt = attempt;
let next_attempt = attempt.next_attempts.remove(0);
ActionItem {
start_dial_out: Some((peer_id.clone(), handler, next_attempt, attempt.next_attempts)),
.. Default::default()
}
} else {
Default::default()
};
return (action, RawSwarmEvent::DialError {
remain_addrs_attempt: num_remain,
peer_id,
multiaddr: failed_addr,
error,
});
}
if let Some(in_pos) = reach_attempts
.other_reach_attempts
.iter()
.position(|i| i.0 == reach_id)
{
let (_, endpoint) = reach_attempts.other_reach_attempts.swap_remove(in_pos);
let error = match error {
RawSwarmReachError::Transport(err) => err,
RawSwarmReachError::PeerIdMismatch { .. } => unreachable!(), };
match endpoint {
ConnectedPoint::Dialer { address } => {
return (Default::default(), RawSwarmEvent::UnknownPeerDialError {
multiaddr: address,
error,
handler,
});
}
ConnectedPoint::Listener { listen_addr, send_back_addr } => {
return (Default::default(), RawSwarmEvent::IncomingConnectionError { listen_addr, send_back_addr, error });
}
}
}
panic!("The API of collection guarantees that the id sent back in ReachError events \
(which is where we call handle_reach_error) is one that was passed to \
add_reach_attempt. Whenever we call add_reach_attempt, we also insert \
at the same time an entry either in out_reach_attempts or in \
other_reach_attempts. It is therefore guaranteed that we find back this ID in \
either of these two sets");
}
pub enum Peer<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a>
where
TTrans: Transport,
{
Connected(PeerConnected<'a, TInEvent>),
PendingConnect(PeerPendingConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr>),
NotConnected(PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>),
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> fmt::Debug for Peer<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
where
TTrans: Transport,
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
Peer::Connected( PeerConnected { peer: _, ref peer_id, ref connected_points }) => {
f.debug_struct("Connected")
.field("peer_id", peer_id)
.field("connected_points", connected_points)
.finish()
}
Peer::PendingConnect( PeerPendingConnect { ref attempt, .. } ) => {
f.debug_struct("PendingConnect")
.field("attempt", attempt)
.finish()
}
Peer::NotConnected(PeerNotConnected { ref peer_id, .. }) => {
f.debug_struct("NotConnected")
.field("peer_id", peer_id)
.finish()
}
}
}
}
impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr>
Peer<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
where
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
TTrans::Dial: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
THandler::OutboundOpenInfo: Send + 'static,
THandlerErr: error::Error + Send + 'static,
{
#[inline]
pub fn as_connected(self) -> Option<PeerConnected<'a, TInEvent>> {
match self {
Peer::Connected(peer) => Some(peer),
_ => None,
}
}
#[inline]
pub fn as_pending_connect(self) -> Option<PeerPendingConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr>> {
match self {
Peer::PendingConnect(peer) => Some(peer),
_ => None,
}
}
#[inline]
pub fn as_not_connected(self) -> Option<PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>> {
match self {
Peer::NotConnected(peer) => Some(peer),
_ => None,
}
}
#[inline]
pub fn or_connect(self, addr: Multiaddr, handler: THandler)
-> Result<PeerPotentialConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr>, Self>
{
self.or_connect_with(move |_| addr, handler)
}
#[inline]
pub fn or_connect_with<TFn>(self, addr: TFn, handler: THandler)
-> Result<PeerPotentialConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr>, Self>
where
TFn: FnOnce(&PeerId) -> Multiaddr,
{
match self {
Peer::Connected(peer) => Ok(PeerPotentialConnect::Connected(peer)),
Peer::PendingConnect(peer) => Ok(PeerPotentialConnect::PendingConnect(peer)),
Peer::NotConnected(peer) => {
let addr = addr(&peer.peer_id);
match peer.connect(addr, handler) {
Ok(peer) => Ok(PeerPotentialConnect::PendingConnect(peer)),
Err(peer) => Err(Peer::NotConnected(peer)),
}
}
}
}
}
pub enum PeerPotentialConnect<'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a> {
Connected(PeerConnected<'a, TInEvent>),
PendingConnect(PeerPendingConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr>),
}
impl<'a, TInEvent, TOutEvent, THandler, THandlerErr> PeerPotentialConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr> {
#[inline]
pub fn close(self) {
match self {
PeerPotentialConnect::Connected(peer) => peer.close(),
PeerPotentialConnect::PendingConnect(peer) => peer.interrupt(),
}
}
#[inline]
pub fn as_connected(self) -> Option<PeerConnected<'a, TInEvent>> {
match self {
PeerPotentialConnect::Connected(peer) => Some(peer),
_ => None,
}
}
#[inline]
pub fn as_pending_connect(self) -> Option<PeerPendingConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr>> {
match self {
PeerPotentialConnect::PendingConnect(peer) => Some(peer),
_ => None,
}
}
}
pub struct PeerConnected<'a, TInEvent: 'a> {
peer: CollecPeerMut<'a, TInEvent>,
connected_points: &'a mut FnvHashMap<PeerId, ConnectedPoint>,
peer_id: PeerId,
}
impl<'a, TInEvent> PeerConnected<'a, TInEvent> {
pub fn close(self) {
self.connected_points.remove(&self.peer_id);
self.peer.close()
}
#[inline]
pub fn endpoint(&self) -> &ConnectedPoint {
self.connected_points.get(&self.peer_id)
.expect("We insert into connected_points whenever a connection is opened and remove \
only when a connection is closed; the underlying API is guaranteed to always \
deliver a connection closed message after it has been opened, and no two \
closed messages; QED")
}
#[inline]
pub fn send_event(&mut self, event: TInEvent) {
self.peer.send_event(event)
}
}
#[derive(Debug)]
pub struct PeerPendingConnect<'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a> {
attempt: OccupiedEntry<'a, PeerId, OutReachAttempt>,
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, RawSwarmReachError, THandlerErr>,
}
impl<'a, TInEvent, TOutEvent, THandler, THandlerErr> PeerPendingConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr> {
#[inline]
pub fn interrupt(self) {
let attempt = self.attempt.remove();
if self.active_nodes.interrupt(attempt.id).is_err() {
panic!("We retreived this attempt.id from out_reach_attempts. We insert in \
out_reach_attempts only at the same time as we call add_reach_attempt. \
Whenever we receive a NodeReached, NodeReplaced or ReachError event, which \
invalidate the attempt.id, we also remove the corresponding entry in \
out_reach_attempts.");
}
}
#[inline]
pub fn attempted_multiaddr(&self) -> &Multiaddr {
&self.attempt.get().cur_attempted
}
#[inline]
pub fn pending_multiaddrs(&self) -> impl Iterator<Item = &Multiaddr> {
self.attempt.get().next_attempts.iter()
}
pub fn append_multiaddr_attempt(&mut self, addr: Multiaddr) {
if self.attempt.get().next_attempts.iter().any(|a| a == &addr) {
return;
}
self.attempt.get_mut().next_attempts.push(addr);
}
}
#[derive(Debug)]
pub struct PeerNotConnected<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a>
where
TTrans: Transport,
{
peer_id: PeerId,
nodes: &'a mut RawSwarm<TTrans, TInEvent, TOutEvent, THandler, THandlerErr>,
}
impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr>
PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
where
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
TTrans::Dial: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
THandler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
THandler::OutboundOpenInfo: Send + 'static, THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
#[inline]
pub fn connect(self, addr: Multiaddr, handler: THandler) -> Result<PeerPendingConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr>, Self> {
self.connect_inner(handler, addr, Vec::new())
}
#[inline]
pub fn connect_iter<TIter>(self, addrs: TIter, handler: THandler)
-> Result<PeerPendingConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr>, Self>
where
TIter: IntoIterator<Item = Multiaddr>,
{
let mut addrs = addrs.into_iter();
let first = match addrs.next() {
Some(f) => f,
None => return Err(self)
};
let rest = addrs.collect();
self.connect_inner(handler, first, rest)
}
fn connect_inner(self, handler: THandler, first: Multiaddr, rest: Vec<Multiaddr>)
-> Result<PeerPendingConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr>, Self>
{
self.nodes.start_dial_out(self.peer_id.clone(), handler, first, rest);
Ok(PeerPendingConnect {
attempt: match self.nodes.reach_attempts.out_reach_attempts.entry(self.peer_id) {
Entry::Occupied(e) => e,
Entry::Vacant(_) => {
panic!("We called out_reach_attempts.insert with this peer id just above")
},
},
active_nodes: &mut self.nodes.active_nodes,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use parking_lot::Mutex;
use tokio::runtime::{Builder, Runtime};
use tests::dummy_transport::DummyTransport;
use tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent};
use tests::dummy_transport::ListenerState;
use tests::dummy_muxer::{DummyMuxer, DummyConnectionState};
use nodes::NodeHandlerEvent;
#[test]
fn query_transport() {
let transport = DummyTransport::new();
let transport2 = transport.clone();
let raw_swarm = RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random());
assert_eq!(raw_swarm.transport(), &transport2);
}
#[test]
fn starts_listening() {
let mut raw_swarm = RawSwarm::<_, _, _, Handler, _>::new(DummyTransport::new(), PeerId::random());
let addr = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let addr2 = addr.clone();
assert!(raw_swarm.listen_on(addr).is_ok());
let listeners = raw_swarm.listeners().collect::<Vec<&Multiaddr>>();
assert_eq!(listeners.len(), 1);
assert_eq!(listeners[0], &addr2);
}
#[test]
fn nat_traversal_transforms_the_observed_address_according_to_the_transport_used() {
let transport = DummyTransport::new();
let mut raw_swarm = RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random());
let addr1 = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let outside_addr1 = "/memory".parse::<Multiaddr>().expect("bad multiaddr");
let addr2 = "/ip4/127.0.0.2/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let outside_addr2 = "/ip4/127.0.0.2/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
raw_swarm.listen_on(addr1).unwrap();
raw_swarm.listen_on(addr2).unwrap();
let natted = raw_swarm
.nat_traversal(&outside_addr1)
.map(|a| a.to_string())
.collect::<Vec<_>>();
assert!(natted.is_empty());
let natted = raw_swarm
.nat_traversal(&outside_addr2)
.map(|a| a.to_string())
.collect::<Vec<_>>();
assert_eq!(natted, vec!["/ip4/127.0.0.2/tcp/1234"])
}
#[test]
fn successful_dial_reaches_a_node() {
let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(DummyTransport::new(), PeerId::random());
let addr = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let dial_res = swarm.dial(addr, Handler::default());
assert!(dial_res.is_ok());
let swarm = Arc::new(Mutex::new(swarm));
let mut rt = Runtime::new().unwrap();
let mut peer_id : Option<PeerId> = None;
while peer_id.is_none() {
let swarm_fut = swarm.clone();
peer_id = rt.block_on(future::poll_fn(move || -> Poll<Option<PeerId>, ()> {
let mut swarm = swarm_fut.lock();
let poll_res = swarm.poll();
match poll_res {
Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => Ok(Async::Ready(Some(peer_id))),
_ => Ok(Async::Ready(None))
}
})).expect("tokio works");
}
let mut swarm = swarm.lock();
let peer = swarm.peer(peer_id.unwrap());
assert_matches!(peer, Peer::Connected(PeerConnected{..}));
}
#[test]
fn num_incoming_negotiated() {
let mut transport = DummyTransport::new();
let peer_id = PeerId::random();
let muxer = DummyMuxer::new();
transport.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some((peer_id, muxer)))));
let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random());
swarm.listen_on("/memory".parse().unwrap()).unwrap();
assert_eq!(swarm.num_incoming_negotiated(), 0);
let mut rt = Runtime::new().unwrap();
let swarm = Arc::new(Mutex::new(swarm));
let swarm_fut = swarm.clone();
let fut = future::poll_fn(move || -> Poll<_, ()> {
let mut swarm_fut = swarm_fut.lock();
assert_matches!(swarm_fut.poll(), Async::Ready(RawSwarmEvent::IncomingConnection(incoming)) => {
incoming.accept(Handler::default());
});
Ok(Async::Ready(()))
});
rt.block_on(fut).expect("tokio works");
let swarm = swarm.lock();
assert_eq!(swarm.num_incoming_negotiated(), 1);
}
#[test]
fn broadcasted_events_reach_active_nodes() {
let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(DummyTransport::new(), PeerId::random());
let mut muxer = DummyMuxer::new();
muxer.set_inbound_connection_state(DummyConnectionState::Pending);
muxer.set_outbound_connection_state(DummyConnectionState::Opened);
let addr = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let mut handler = Handler::default();
handler.next_states = vec![HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 1") ))),];
let dial_result = swarm.dial(addr, handler);
assert!(dial_result.is_ok());
swarm.broadcast_event(&InEvent::NextState);
let swarm = Arc::new(Mutex::new(swarm));
let mut rt = Runtime::new().unwrap();
let mut peer_id : Option<PeerId> = None;
while peer_id.is_none() {
let swarm_fut = swarm.clone();
peer_id = rt.block_on(future::poll_fn(move || -> Poll<Option<PeerId>, ()> {
let mut swarm = swarm_fut.lock();
let poll_res = swarm.poll();
match poll_res {
Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => Ok(Async::Ready(Some(peer_id))),
_ => Ok(Async::Ready(None))
}
})).expect("tokio works");
}
let mut keep_polling = true;
while keep_polling {
let swarm_fut = swarm.clone();
keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut swarm = swarm_fut.lock();
match swarm.poll() {
Async::Ready(event) => {
assert_matches!(event, RawSwarmEvent::NodeEvent { peer_id: _, event: inner_event } => {
assert_matches!(inner_event, OutEvent::Custom("from handler 1"));
});
Ok(Async::Ready(false))
},
_ => Ok(Async::Ready(true))
}
})).expect("tokio works");
}
}
#[test]
fn querying_for_pending_peer() {
let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(DummyTransport::new(), PeerId::random());
let peer_id = PeerId::random();
let peer = swarm.peer(peer_id.clone());
assert_matches!(peer, Peer::NotConnected(PeerNotConnected{ .. }));
let addr = "/memory".parse().expect("bad multiaddr");
let pending_peer = peer.as_not_connected().unwrap().connect(addr, Handler::default());
assert!(pending_peer.is_ok());
assert_matches!(pending_peer, Ok(PeerPendingConnect { .. } ));
}
#[test]
fn querying_for_unknown_peer() {
let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(DummyTransport::new(), PeerId::random());
let peer_id = PeerId::random();
let peer = swarm.peer(peer_id.clone());
assert_matches!(peer, Peer::NotConnected( PeerNotConnected { nodes: _, peer_id: node_peer_id }) => {
assert_eq!(node_peer_id, peer_id);
});
}
#[test]
fn querying_for_connected_peer() {
let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(DummyTransport::new(), PeerId::random());
let addr = "/ip4/127.0.0.1/tcp/1234".parse().expect("bad multiaddr");
swarm.dial(addr, Handler::default()).expect("dialing works");
let swarm = Arc::new(Mutex::new(swarm));
let mut rt = Runtime::new().unwrap();
let mut peer_id : Option<PeerId> = None;
while peer_id.is_none() {
let swarm_fut = swarm.clone();
peer_id = rt.block_on(future::poll_fn(move || -> Poll<Option<PeerId>, ()> {
let mut swarm = swarm_fut.lock();
let poll_res = swarm.poll();
match poll_res {
Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => Ok(Async::Ready(Some(peer_id))),
_ => Ok(Async::Ready(None))
}
})).expect("tokio works");
}
let mut swarm = swarm.lock();
let peer = swarm.peer(peer_id.unwrap());
assert_matches!(peer, Peer::Connected( PeerConnected { .. } ));
}
#[test]
fn poll_with_closed_listener() {
let mut transport = DummyTransport::new();
transport.set_initial_listener_state(ListenerState::Ok(Async::Ready(None)));
let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random());
swarm.listen_on("/memory".parse().unwrap()).unwrap();
let mut rt = Runtime::new().unwrap();
let swarm = Arc::new(Mutex::new(swarm));
let swarm_fut = swarm.clone();
let fut = future::poll_fn(move || -> Poll<_, ()> {
let mut swarm = swarm_fut.lock();
assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::ListenerClosed { .. } ));
Ok(Async::Ready(()))
});
rt.block_on(fut).expect("tokio works");
}
#[test]
fn unknown_peer_that_is_unreachable_yields_unknown_peer_dial_error() {
let mut transport = DummyTransport::new();
transport.make_dial_fail();
let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random());
let addr = "/memory".parse::<Multiaddr>().expect("bad multiaddr");
let handler = Handler::default();
let dial_result = swarm.dial(addr, handler);
assert!(dial_result.is_ok());
let swarm = Arc::new(Mutex::new(swarm));
let mut rt = Runtime::new().unwrap();
let mut keep_polling = true;
while keep_polling {
let swarm_fut = swarm.clone();
keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut swarm = swarm_fut.lock();
match swarm.poll() {
Async::NotReady => Ok(Async::Ready(true)),
Async::Ready(event) => {
assert_matches!(event, RawSwarmEvent::UnknownPeerDialError { .. } );
Ok(Async::Ready(false))
},
}
})).expect("tokio works");
}
}
#[test]
fn known_peer_that_is_unreachable_yields_dial_error() {
let mut transport = DummyTransport::new();
let peer_id = PeerId::random();
transport.set_next_peer_id(&peer_id);
transport.make_dial_fail();
let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random())));
{
let swarm1 = swarm.clone();
let mut swarm1 = swarm1.lock();
let peer = swarm1.peer(peer_id.clone());
assert_matches!(peer, Peer::NotConnected(PeerNotConnected{ .. }));
let addr = "/memory".parse::<Multiaddr>().expect("bad multiaddr");
let pending_peer = peer.as_not_connected().unwrap().connect(addr, Handler::default());
assert!(pending_peer.is_ok());
assert_matches!(pending_peer, Ok(PeerPendingConnect { .. } ));
}
let mut rt = Runtime::new().unwrap();
let mut keep_polling = true;
while keep_polling {
let swarm_fut = swarm.clone();
let peer_id = peer_id.clone();
keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut swarm = swarm_fut.lock();
match swarm.poll() {
Async::NotReady => Ok(Async::Ready(true)),
Async::Ready(event) => {
let failed_peer_id = assert_matches!(
event,
RawSwarmEvent::DialError { remain_addrs_attempt: _, peer_id: failed_peer_id, .. } => failed_peer_id
);
assert_eq!(peer_id, failed_peer_id);
Ok(Async::Ready(false))
},
}
})).expect("tokio works");
}
}
#[test]
fn yields_node_error_when_there_is_an_error_after_successful_connect() {
let mut transport = DummyTransport::new();
let peer_id = PeerId::random();
transport.set_next_peer_id(&peer_id);
let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random())));
{
let swarm1 = swarm.clone();
let mut swarm1 = swarm1.lock();
let peer = swarm1.peer(peer_id.clone());
let addr = "/unix/reachable".parse().expect("bad multiaddr");
let mut handler = Handler::default();
handler.next_states = vec![ HandlerState::Err ];
peer.as_not_connected().unwrap().connect(addr, handler).expect("can connect unconnected peer");
}
let mut rt = Builder::new().core_threads(1).build().unwrap();
let mut keep_polling = true;
while keep_polling {
let swarm_fut = swarm.clone();
keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut swarm = swarm_fut.lock();
swarm.broadcast_event(&InEvent::NextState);
match swarm.poll() {
Async::NotReady => Ok(Async::Ready(true)),
Async::Ready(event) => {
assert_matches!(event, RawSwarmEvent::Connected { .. });
Ok(Async::Ready(false))
},
}
})).expect("tokio works");
}
let swarm_fut = swarm.clone();
let expected_peer_id = peer_id.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut swarm = swarm_fut.lock();
assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::NodeError { peer_id, .. }) => {
assert_eq!(peer_id, expected_peer_id);
});
Ok(Async::Ready(()))
})).expect("tokio works");
}
#[test]
fn yields_node_closed_when_the_node_closes_after_successful_connect() {
let mut transport = DummyTransport::new();
let peer_id = PeerId::random();
transport.set_next_peer_id(&peer_id);
let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random())));
{
let swarm1 = swarm.clone();
let mut swarm1 = swarm1.lock();
let peer = swarm1.peer(peer_id.clone());
let addr = "/unix/reachable".parse().expect("bad multiaddr");
let mut handler = Handler::default();
handler.next_states = vec![ HandlerState::Ready(None) ];
peer.as_not_connected().unwrap().connect(addr, handler).expect("can connect unconnected peer");
}
let mut rt = Builder::new().core_threads(1).build().unwrap();
let mut keep_polling = true;
while keep_polling {
let swarm_fut = swarm.clone();
keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut swarm = swarm_fut.lock();
swarm.broadcast_event(&InEvent::NextState);
match swarm.poll() {
Async::NotReady => Ok(Async::Ready(true)),
Async::Ready(event) => {
assert_matches!(event, RawSwarmEvent::Connected { .. });
Ok(Async::Ready(false))
},
}
})).expect("tokio works");
}
let swarm_fut = swarm.clone();
let expected_peer_id = peer_id.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut swarm = swarm_fut.lock();
assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::NodeClosed { peer_id, .. }) => {
assert_eq!(peer_id, expected_peer_id);
});
Ok(Async::Ready(()))
})).expect("tokio works");
}
#[test]
fn local_prio_equivalence_relation() {
for _ in 0..1000 {
let a = PeerId::random();
let b = PeerId::random();
assert_ne!(has_dial_prio(&a, &b), has_dial_prio(&b, &a));
}
}
}