use super::chan::ClientChanEvent;
use super::ichan::{ConnectingChannel, IncomingChannelsEvent, NewChannelError};
use crate::types::{InStream, OutStream};
use err_derive::Error;
use futures::{
channel::{mpsc, oneshot},
prelude::*,
};
use quinn::ConnectionError;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
#[derive(Debug, Error)]
pub enum ConnectingStreamError {
#[error(display = "Could not send event")]
Event,
#[error(display = "Coordinator responded with error")]
Coord,
#[error(display = "Reused stream id")]
StreamId,
#[error(display = "Sending initial message: {:?}", _0)]
InitMsg(#[error(source, no_from)] io::Error),
#[error(display = "Flushing init message: {:?}", _0)]
Flush(#[error(source, no_from)] io::Error),
#[error(display = "Opening bidirectional channel: {:?}", _0)]
OpenBi(#[source] ConnectionError),
#[error(display = "Outgoing connection canceled: {:?}", _0)]
Canceled(#[source] oneshot::Canceled),
#[error(display = "No such peer: {:?}", _0)]
NoSuchPeer(String),
#[error(display = "Stream initialization: {:?}", _0)]
InitStream(#[source] io::Error),
}
def_into_error!(ConnectingStreamError);
pub(super) type ConnectingStreamHandle = oneshot::Sender<Result<(OutStream, InStream), ConnectingStreamError>>;
type ChannelData = (
ConnectingChannel,
mpsc::UnboundedSender<ClientChanEvent>,
mpsc::UnboundedSender<IncomingChannelsEvent>,
String,
u64,
);
pub struct ConnectingStream {
srec: oneshot::Receiver<Result<(OutStream, InStream), ConnectingStreamError>>,
connchan: Option<(ChannelData, ConnectingStreamHandle)>,
}
impl ConnectingStream {
pub(super) fn new(chandata: Option<ChannelData>) -> (Self, Option<ConnectingStreamHandle>) {
let (ssnd, srec) = oneshot::channel();
if let Some(chandata) = chandata {
(
Self {
srec,
connchan: Some((chandata, ssnd)),
},
None,
)
} else {
(Self { srec, connchan: None }, Some(ssnd))
}
}
}
impl Future for ConnectingStream {
type Output = Result<(OutStream, InStream), ConnectingStreamError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if let Some(((ref mut chan, _, _, _, _), _)) = self.connchan.as_mut() {
if let Poll::Ready(rdy) = chan.poll_unpin(cx) {
use ClientChanEvent::Stream as CCStream;
use IncomingChannelsEvent::NewStream as ICStream;
let ((_, csnd, isnd, to, sid), sender) = self.connchan.take().unwrap();
if let Err(sender) = match rdy {
Ok(()) | Err(NewChannelError::Duplicate) => {
isnd.unbounded_send(ICStream(to, sid, sender)).map_err(IntoHandle::into)
}
_ => csnd.unbounded_send(CCStream(to, sid, sender)).map_err(IntoHandle::into),
} {
sender.send(Err(ConnectingStreamError::Event)).ok();
}
} else {
return Poll::Pending;
}
}
match self.srec.poll_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Err(ConnectingStreamError::Canceled(e))),
Poll::Ready(Ok(res)) => Poll::Ready(res),
}
}
}
trait IntoHandle {
fn into(e: Self) -> ConnectingStreamHandle;
}
impl IntoHandle for mpsc::TrySendError<ClientChanEvent> {
fn into(e: mpsc::TrySendError<ClientChanEvent>) -> ConnectingStreamHandle {
if let ClientChanEvent::Stream(_, _, handle) = e.into_inner() {
handle
} else {
panic!("bad conversion")
}
}
}
impl IntoHandle for mpsc::TrySendError<IncomingChannelsEvent> {
fn into(e: mpsc::TrySendError<IncomingChannelsEvent>) -> ConnectingStreamHandle {
if let IncomingChannelsEvent::NewStream(_, _, handle) = e.into_inner() {
handle
} else {
panic!("bad conversion")
}
}
}