mod connecting;
mod stream;
use std::{
pin::Pin,
task::{Context, Poll},
};
pub use connecting::Connecting;
use futures::{future::BoxFuture, FutureExt};
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
pub use stream::Stream;
use crate::{ConnectionError, Error};
pub struct Connection {
connection: quinn::Connection,
incoming: Option<
BoxFuture<'static, Result<(quinn::SendStream, quinn::RecvStream), quinn::ConnectionError>>,
>,
outgoing: Option<
BoxFuture<'static, Result<(quinn::SendStream, quinn::RecvStream), quinn::ConnectionError>>,
>,
closing: Option<BoxFuture<'static, quinn::ConnectionError>>,
}
impl Connection {
fn new(connection: quinn::Connection) -> Self {
Self {
connection,
incoming: None,
outgoing: None,
closing: None,
}
}
}
impl StreamMuxer for Connection {
type Substream = Stream;
type Error = Error;
fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
let this = self.get_mut();
let incoming = this.incoming.get_or_insert_with(|| {
let connection = this.connection.clone();
async move { connection.accept_bi().await }.boxed()
});
let (send, recv) = futures::ready!(incoming.poll_unpin(cx)).map_err(ConnectionError)?;
this.incoming.take();
let stream = Stream::new(send, recv);
Poll::Ready(Ok(stream))
}
fn poll_outbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
let this = self.get_mut();
let outgoing = this.outgoing.get_or_insert_with(|| {
let connection = this.connection.clone();
async move { connection.open_bi().await }.boxed()
});
let (send, recv) = futures::ready!(outgoing.poll_unpin(cx)).map_err(ConnectionError)?;
this.outgoing.take();
let stream = Stream::new(send, recv);
Poll::Ready(Ok(stream))
}
fn poll(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
Poll::Pending
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.get_mut();
let closing = this.closing.get_or_insert_with(|| {
this.connection.close(From::from(0u32), &[]);
let connection = this.connection.clone();
async move { connection.closed().await }.boxed()
});
match futures::ready!(closing.poll_unpin(cx)) {
quinn::ConnectionError::LocallyClosed => {}
error => return Poll::Ready(Err(Error::Connection(ConnectionError(error)))),
};
Poll::Ready(Ok(()))
}
}