use std::{convert::Infallible, pin::Pin};
use futures::{
channel::{mpsc, oneshot},
future::{poll_fn, Either, Future},
SinkExt, StreamExt,
};
use libp2p_core::muxing::StreamMuxerBox;
use super::concurrent_dial::ConcurrentDial;
use crate::{
connection::{
self, ConnectionError, ConnectionId, PendingInboundConnectionError,
PendingOutboundConnectionError,
},
transport::TransportError,
ConnectionHandler, Multiaddr, PeerId,
};
#[derive(Debug)]
pub(crate) enum Command<T> {
NotifyHandler(T),
Close,
}
pub(crate) enum PendingConnectionEvent {
ConnectionEstablished {
id: ConnectionId,
output: (PeerId, StreamMuxerBox),
outgoing: Option<(Multiaddr, Vec<(Multiaddr, TransportError<std::io::Error>)>)>,
},
PendingFailed {
id: ConnectionId,
error: Either<PendingOutboundConnectionError, PendingInboundConnectionError>,
},
}
#[derive(Debug)]
pub(crate) enum EstablishedConnectionEvent<ToBehaviour> {
AddressChange {
id: ConnectionId,
peer_id: PeerId,
new_address: Multiaddr,
},
Notify {
id: ConnectionId,
peer_id: PeerId,
event: ToBehaviour,
},
Closed {
id: ConnectionId,
peer_id: PeerId,
error: Option<ConnectionError>,
},
}
pub(crate) async fn new_for_pending_outgoing_connection(
connection_id: ConnectionId,
dial: ConcurrentDial,
abort_receiver: oneshot::Receiver<Infallible>,
mut events: mpsc::Sender<PendingConnectionEvent>,
) {
match futures::future::select(abort_receiver, Box::pin(dial)).await {
Either::Left((Err(oneshot::Canceled), _)) => {
let _ = events
.send(PendingConnectionEvent::PendingFailed {
id: connection_id,
error: Either::Left(PendingOutboundConnectionError::Aborted),
})
.await;
}
Either::Left((Ok(v), _)) => libp2p_core::util::unreachable(v),
Either::Right((Ok((address, output, errors)), _)) => {
let _ = events
.send(PendingConnectionEvent::ConnectionEstablished {
id: connection_id,
output,
outgoing: Some((address, errors)),
})
.await;
}
Either::Right((Err(e), _)) => {
let _ = events
.send(PendingConnectionEvent::PendingFailed {
id: connection_id,
error: Either::Left(PendingOutboundConnectionError::Transport(e)),
})
.await;
}
}
}
pub(crate) async fn new_for_pending_incoming_connection<TFut>(
connection_id: ConnectionId,
future: TFut,
abort_receiver: oneshot::Receiver<Infallible>,
mut events: mpsc::Sender<PendingConnectionEvent>,
) where
TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
{
match futures::future::select(abort_receiver, Box::pin(future)).await {
Either::Left((Err(oneshot::Canceled), _)) => {
let _ = events
.send(PendingConnectionEvent::PendingFailed {
id: connection_id,
error: Either::Right(PendingInboundConnectionError::Aborted),
})
.await;
}
Either::Left((Ok(v), _)) => libp2p_core::util::unreachable(v),
Either::Right((Ok(output), _)) => {
let _ = events
.send(PendingConnectionEvent::ConnectionEstablished {
id: connection_id,
output,
outgoing: None,
})
.await;
}
Either::Right((Err(e), _)) => {
let _ = events
.send(PendingConnectionEvent::PendingFailed {
id: connection_id,
error: Either::Right(PendingInboundConnectionError::Transport(
TransportError::Other(e),
)),
})
.await;
}
}
}
pub(crate) async fn new_for_established_connection<THandler>(
connection_id: ConnectionId,
peer_id: PeerId,
mut connection: crate::connection::Connection<THandler>,
mut command_receiver: mpsc::Receiver<Command<THandler::FromBehaviour>>,
mut events: mpsc::Sender<EstablishedConnectionEvent<THandler::ToBehaviour>>,
) where
THandler: ConnectionHandler,
{
loop {
match futures::future::select(
command_receiver.next(),
poll_fn(|cx| Pin::new(&mut connection).poll(cx)),
)
.await
{
Either::Left((Some(command), _)) => match command {
Command::NotifyHandler(event) => connection.on_behaviour_event(event),
Command::Close => {
command_receiver.close();
let (remaining_events, closing_muxer) = connection.close();
let _ = events
.send_all(&mut remaining_events.map(|event| {
Ok(EstablishedConnectionEvent::Notify {
id: connection_id,
event,
peer_id,
})
}))
.await;
let error = closing_muxer.await.err().map(ConnectionError::IO);
let _ = events
.send(EstablishedConnectionEvent::Closed {
id: connection_id,
peer_id,
error,
})
.await;
return;
}
},
Either::Left((None, _)) => return,
Either::Right((event, _)) => {
match event {
Ok(connection::Event::Handler(event)) => {
let _ = events
.send(EstablishedConnectionEvent::Notify {
id: connection_id,
peer_id,
event,
})
.await;
}
Ok(connection::Event::AddressChange(new_address)) => {
let _ = events
.send(EstablishedConnectionEvent::AddressChange {
id: connection_id,
peer_id,
new_address,
})
.await;
}
Err(error) => {
command_receiver.close();
let (remaining_events, _closing_muxer) = connection.close();
let _ = events
.send_all(&mut remaining_events.map(|event| {
Ok(EstablishedConnectionEvent::Notify {
id: connection_id,
event,
peer_id,
})
}))
.await;
let _ = events
.send(EstablishedConnectionEvent::Closed {
id: connection_id,
peer_id,
error: Some(error),
})
.await;
return;
}
}
}
}
}
}