use crate::{
PeerId,
muxing::StreamMuxer,
nodes::{
node::Substream,
handled_node_tasks::{HandledNodesEvent, HandledNodesTasks, TaskClosedEvent},
handled_node_tasks::{IntoNodeHandler, Task as HandledNodesTask, TaskId, ClosedTask},
handled_node::{HandledNodeError, NodeHandler}
}
};
use fnv::FnvHashMap;
use futures::prelude::*;
use std::{error, fmt, hash::Hash, mem};
mod tests;
pub struct CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo = PeerId, TPeerId = PeerId> {
inner: HandledNodesTasks<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TaskState<TConnInfo, TUserData>, TConnInfo>,
nodes: FnvHashMap<TPeerId, TaskId>,
}
impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId> fmt::Debug for
CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>
where
TConnInfo: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_tuple("CollectionStream").finish()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum TaskState<TConnInfo, TUserData> {
Pending,
Connected(TConnInfo, TUserData),
}
pub enum CollectionEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId> {
NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>),
NodeClosed {
conn_info: TConnInfo,
error: HandledNodeError<THandlerErr>,
user_data: TUserData,
},
ReachError {
id: ReachAttemptId,
error: TReachErr,
handler: THandler,
},
NodeEvent {
peer: PeerMut<'a, TInEvent, TUserData, TConnInfo, TPeerId>,
event: TOutEvent,
},
}
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId> fmt::Debug for
CollectionEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>
where TOutEvent: fmt::Debug,
TReachErr: fmt::Debug,
THandlerErr: fmt::Debug,
TConnInfo: fmt::Debug,
TUserData: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match *self {
CollectionEvent::NodeReached(ref inner) => {
f.debug_tuple("CollectionEvent::NodeReached")
.field(inner)
.finish()
},
CollectionEvent::NodeClosed { ref conn_info, ref error, ref user_data } => {
f.debug_struct("CollectionEvent::NodeClosed")
.field("conn_info", conn_info)
.field("user_data", user_data)
.field("error", error)
.finish()
},
CollectionEvent::ReachError { ref id, ref error, .. } => {
f.debug_struct("CollectionEvent::ReachError")
.field("id", id)
.field("error", error)
.finish()
},
CollectionEvent::NodeEvent { ref peer, ref event } => {
f.debug_struct("CollectionEvent::NodeEvent")
.field("conn_info", peer.info())
.field("event", event)
.finish()
},
}
}
}
#[must_use = "The node reached event is used to accept the newly-opened connection"]
pub struct CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo = PeerId, TPeerId = PeerId> {
conn_info: Option<TConnInfo>,
id: TaskId,
parent: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>,
}
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>
CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>
{
pub fn connection_info(&self) -> &TConnInfo {
self.conn_info.as_ref().expect("conn_info is always Some when the object is alive; QED")
}
pub fn peer_id(&self) -> &TPeerId
where
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
TPeerId: Eq + Hash,
{
self.connection_info().peer_id()
}
#[inline]
pub fn reach_attempt_id(&self) -> ReachAttemptId {
ReachAttemptId(self.id)
}
}
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>
CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>
where
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
TPeerId: Eq + Hash,
{
#[inline]
pub fn would_replace(&self) -> bool {
self.parent.nodes.contains_key(self.connection_info().peer_id())
}
pub fn accept(mut self, user_data: TUserData) -> (CollectionNodeAccept<TConnInfo, TUserData>, TConnInfo)
where
TConnInfo: Clone,
TPeerId: Clone,
{
let self_conn_info = self.conn_info.take()
.expect("conn_info is always Some when the object is alive; QED");
let former_task_id = self.parent.nodes.insert(self_conn_info.peer_id().clone(), self.id);
*self.parent.inner.task(self.id)
.expect("A CollectionReachEvent is only ever created from a valid attempt; QED")
.user_data_mut() = TaskState::Connected(self_conn_info.clone(), user_data);
let tasks = &mut self.parent.inner;
let ret_value = if let Some(former_task) = former_task_id.and_then(|i| tasks.task(i)) {
debug_assert!(match *former_task.user_data() {
TaskState::Connected(ref p, _) if p.peer_id() == self_conn_info.peer_id() => true,
_ => false
});
let (old_info, user_data) = match former_task.close().into_user_data() {
TaskState::Connected(old_info, user_data) => (old_info, user_data),
_ => panic!("The former task was picked from `nodes`; all the nodes in `nodes` \
are always in the connected state")
};
(CollectionNodeAccept::ReplacedExisting(old_info, user_data), self_conn_info)
} else {
(CollectionNodeAccept::NewEntry, self_conn_info)
};
mem::forget(self);
ret_value
}
#[inline]
pub fn deny(mut self) -> TConnInfo {
let conn_info = self.conn_info.take()
.expect("conn_info is always Some when the object is alive; QED");
drop(self);
conn_info
}
}
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId> fmt::Debug for
CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>
where
TConnInfo: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("CollectionReachEvent")
.field("conn_info", &self.conn_info)
.field("reach_attempt_id", &self.reach_attempt_id())
.finish()
}
}
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId> Drop for
CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>
{
fn drop(&mut self) {
let task = self.parent.inner.task(self.id)
.expect("we create the CollectionReachEvent with a valid task id; the \
CollectionReachEvent mutably borrows the collection, therefore nothing \
can delete this task during the lifetime of the CollectionReachEvent; \
therefore the task is still valid when we delete it; QED");
debug_assert!(if let TaskState::Pending = task.user_data() { true } else { false });
task.close();
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum CollectionNodeAccept<TConnInfo, TUserData> {
ReplacedExisting(TConnInfo, TUserData),
NewEntry,
}
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct ReachAttemptId(TaskId);
pub trait ConnectionInfo {
type PeerId: Eq + Hash;
fn peer_id(&self) -> &Self::PeerId;
}
impl ConnectionInfo for PeerId {
type PeerId = PeerId;
fn peer_id(&self) -> &PeerId {
self
}
}
impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>
CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>
where
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
TPeerId: Eq + Hash,
{
#[inline]
pub fn new() -> Self {
CollectionStream {
inner: HandledNodesTasks::new(),
nodes: Default::default(),
}
}
pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, handler: THandler)
-> ReachAttemptId
where
TFut: Future<Item = (TConnInfo, TMuxer), Error = TReachErr> + Send + 'static,
THandler: IntoNodeHandler<TConnInfo> + Send + 'static,
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static,
TReachErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TConnInfo: Send + 'static,
{
ReachAttemptId(self.inner.add_reach_attempt(future, TaskState::Pending, handler))
}
pub fn interrupt(&mut self, id: ReachAttemptId) -> Result<InterruptedReachAttempt<TInEvent, TConnInfo, TUserData>, InterruptError> {
match self.inner.task(id.0) {
None => Err(InterruptError::ReachAttemptNotFound),
Some(task) => {
match task.user_data() {
TaskState::Connected(_, _) => return Err(InterruptError::AlreadyReached),
TaskState::Pending => (),
};
Ok(InterruptedReachAttempt {
inner: task.close(),
})
}
}
}
#[inline]
pub fn broadcast_event(&mut self, event: &TInEvent)
where TInEvent: Clone,
{
self.inner.broadcast_event(event)
}
pub fn add_connection<TMuxer>(&mut self, conn_info: TConnInfo, user_data: TUserData, muxer: TMuxer, handler: THandler::Handler)
-> CollectionNodeAccept<TConnInfo, TUserData>
where
THandler: IntoNodeHandler<TConnInfo> + Send + 'static,
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static,
TReachErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TConnInfo: Clone + Send + 'static,
TPeerId: Clone,
{
let task_id = self.inner.add_connection(
TaskState::Pending,
muxer,
handler
);
CollectionReachEvent {
conn_info: Some(conn_info),
id: task_id,
parent: self,
}.accept(user_data).0
}
#[inline]
pub fn peer_mut(&mut self, id: &TPeerId) -> Option<PeerMut<'_, TInEvent, TUserData, TConnInfo, TPeerId>> {
let task = match self.nodes.get(id) {
Some(&task) => task,
None => return None,
};
match self.inner.task(task) {
Some(inner) => Some(PeerMut {
inner,
nodes: &mut self.nodes,
}),
None => None,
}
}
#[inline]
pub fn has_connection(&self, id: &TPeerId) -> bool {
self.nodes.contains_key(id)
}
#[inline]
pub fn connections(&self) -> impl Iterator<Item = &TPeerId> {
self.nodes.keys()
}
pub fn poll(&mut self) -> Async<CollectionEvent<'_, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>>
where
TConnInfo: Clone,
{
let item = match self.inner.poll() {
Async::Ready(item) => item,
Async::NotReady => return Async::NotReady,
};
match item {
HandledNodesEvent::TaskClosed { task, result, handler } => {
let id = task.id();
let user_data = task.into_user_data();
match (user_data, result, handler) {
(TaskState::Pending, TaskClosedEvent::Reach(err), Some(handler)) => {
Async::Ready(CollectionEvent::ReachError {
id: ReachAttemptId(id),
error: err,
handler,
})
},
(TaskState::Pending, TaskClosedEvent::Node(_), _) => {
panic!("We switch the task state to Connected once we're connected, and \
a TaskClosedEvent::Node can only happen after we're \
connected; QED");
},
(TaskState::Pending, TaskClosedEvent::Reach(_), None) => {
panic!("The HandledNodesTasks is guaranteed to always return the handler \
when producing a TaskClosedEvent::Reach error");
},
(TaskState::Connected(conn_info, user_data), TaskClosedEvent::Node(err), _handler) => {
debug_assert!(_handler.is_none());
let _node_task_id = self.nodes.remove(conn_info.peer_id());
debug_assert_eq!(_node_task_id, Some(id));
Async::Ready(CollectionEvent::NodeClosed {
conn_info,
error: err,
user_data,
})
},
(TaskState::Connected(_, _), TaskClosedEvent::Reach(_), _) => {
panic!("A TaskClosedEvent::Reach can only happen before we are connected \
to a node; therefore the TaskState won't be Connected; QED");
},
}
},
HandledNodesEvent::NodeReached { task, conn_info } => {
let id = task.id();
drop(task);
Async::Ready(CollectionEvent::NodeReached(CollectionReachEvent {
parent: self,
id,
conn_info: Some(conn_info),
}))
},
HandledNodesEvent::NodeEvent { task, event } => {
let conn_info = match task.user_data() {
TaskState::Connected(conn_info, _) => conn_info.clone(),
_ => panic!("we can only receive NodeEvent events from a task after we \
received a corresponding NodeReached event from that same task; \
when we receive a NodeReached event, we ensure that the entry in \
self.tasks is switched to the Connected state; QED"),
};
drop(task);
Async::Ready(CollectionEvent::NodeEvent {
peer: self.peer_mut(&conn_info.peer_id())
.expect("we can only receive NodeEvent events from a task after we \
received a corresponding NodeReached event from that same task;\
when that happens, peer_mut will always return Some; QED"),
event,
})
}
}
}
}
#[derive(Debug)]
pub enum InterruptError {
ReachAttemptNotFound,
AlreadyReached,
}
impl fmt::Display for InterruptError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
InterruptError::ReachAttemptNotFound =>
write!(f, "The reach attempt could not be found."),
InterruptError::AlreadyReached =>
write!(f, "The reach attempt has already completed or reached the node."),
}
}
}
impl error::Error for InterruptError {}
pub struct InterruptedReachAttempt<TInEvent, TConnInfo, TUserData> {
inner: ClosedTask<TInEvent, TaskState<TConnInfo, TUserData>>,
}
impl<TInEvent, TConnInfo, TUserData> fmt::Debug for InterruptedReachAttempt<TInEvent, TConnInfo, TUserData>
where
TUserData: fmt::Debug,
TConnInfo: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_tuple("InterruptedReachAttempt")
.field(&self.inner)
.finish()
}
}
pub struct PeerMut<'a, TInEvent, TUserData, TConnInfo = PeerId, TPeerId = PeerId> {
inner: HandledNodesTask<'a, TInEvent, TaskState<TConnInfo, TUserData>>,
nodes: &'a mut FnvHashMap<TPeerId, TaskId>,
}
impl<'a, TInEvent, TUserData, TConnInfo, TPeerId> PeerMut<'a, TInEvent, TUserData, TConnInfo, TPeerId> {
pub fn info(&self) -> &TConnInfo {
match self.inner.user_data() {
TaskState::Connected(conn_info, _) => conn_info,
_ => panic!("A PeerMut is only ever constructed from a peer in the connected \
state; QED")
}
}
}
impl<'a, TInEvent, TUserData, TConnInfo, TPeerId> PeerMut<'a, TInEvent, TUserData, TConnInfo, TPeerId>
where
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
TPeerId: Eq + Hash,
{
pub fn id(&self) -> &TPeerId {
self.info().peer_id()
}
pub fn user_data(&self) -> &TUserData {
match self.inner.user_data() {
TaskState::Connected(_, user_data) => user_data,
_ => panic!("A PeerMut is only ever constructed from a peer in the connected \
state; QED")
}
}
pub fn user_data_mut(&mut self) -> &mut TUserData {
match self.inner.user_data_mut() {
TaskState::Connected(_, user_data) => user_data,
_ => panic!("A PeerMut is only ever constructed from a peer in the connected \
state; QED")
}
}
#[inline]
pub fn send_event(&mut self, event: TInEvent) {
self.inner.send_event(event)
}
pub fn close(self) -> TUserData {
let task_id = self.inner.id();
if let TaskState::Connected(conn_info, user_data) = self.inner.close().into_user_data() {
let old_task_id = self.nodes.remove(conn_info.peer_id());
debug_assert_eq!(old_task_id, Some(task_id));
user_data
} else {
panic!("a PeerMut can only be created if an entry is present in nodes; an entry in \
nodes always matched a Connected entry in the tasks; QED");
}
}
pub fn take_over(&mut self, id: InterruptedReachAttempt<TInEvent, TConnInfo, TUserData>) {
let _state = self.inner.take_over(id.inner);
debug_assert!(if let TaskState::Pending = _state { true } else { false });
}
}