use crate::identity::Keypair;
use crate::muxing::{IReadWrite, IStreamMuxer, StreamInfo, StreamMuxer, StreamMuxerEx};
use crate::secure_io::SecureInfo;
use crate::transport::{ConnectionInfo, TransportError};
use crate::upgrade::ProtocolName;
use crate::{Multiaddr, PeerId, PublicKey};
use async_std::io::Error;
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::{
io::{IoSlice, IoSliceMut},
prelude::*,
};
use libp2prs_traits::{ReadEx, SplitEx, WriteEx};
use pin_project::pin_project;
use std::{io, io::Error as IoError, pin::Pin, task::Context, task::Poll};
#[pin_project(project = EitherOutputProj)]
#[derive(Debug, Copy, Clone)]
pub enum AsyncEitherOutput<A, B> {
A(#[pin] A),
B(#[pin] B),
}
impl<A, B> AsyncRead for AsyncEitherOutput<A, B>
where
A: AsyncRead,
B: AsyncRead,
{
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, IoError>> {
match self.project() {
EitherOutputProj::A(a) => AsyncRead::poll_read(a, cx, buf),
EitherOutputProj::B(b) => AsyncRead::poll_read(b, cx, buf),
}
}
fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>]) -> Poll<Result<usize, IoError>> {
match self.project() {
EitherOutputProj::A(a) => AsyncRead::poll_read_vectored(a, cx, bufs),
EitherOutputProj::B(b) => AsyncRead::poll_read_vectored(b, cx, bufs),
}
}
}
impl<A, B> AsyncWrite for AsyncEitherOutput<A, B>
where
A: AsyncWrite,
B: AsyncWrite,
{
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, IoError>> {
match self.project() {
EitherOutputProj::A(a) => AsyncWrite::poll_write(a, cx, buf),
EitherOutputProj::B(b) => AsyncWrite::poll_write(b, cx, buf),
}
}
fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>]) -> Poll<Result<usize, IoError>> {
match self.project() {
EitherOutputProj::A(a) => AsyncWrite::poll_write_vectored(a, cx, bufs),
EitherOutputProj::B(b) => AsyncWrite::poll_write_vectored(b, cx, bufs),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), IoError>> {
match self.project() {
EitherOutputProj::A(a) => AsyncWrite::poll_flush(a, cx),
EitherOutputProj::B(b) => AsyncWrite::poll_flush(b, cx),
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), IoError>> {
match self.project() {
EitherOutputProj::A(a) => AsyncWrite::poll_close(a, cx),
EitherOutputProj::B(b) => AsyncWrite::poll_close(b, cx),
}
}
}
#[derive(Debug, Copy, Clone)]
pub enum EitherOutput<A, B> {
A(A),
B(B),
}
#[async_trait]
impl<A, B> ReadEx for EitherOutput<A, B>
where
A: ReadEx,
B: ReadEx,
{
async fn read2(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
EitherOutput::A(a) => ReadEx::read2(a, buf).await,
EitherOutput::B(b) => ReadEx::read2(b, buf).await,
}
}
}
#[async_trait]
impl<A, B> WriteEx for EitherOutput<A, B>
where
A: WriteEx,
B: WriteEx,
{
async fn write2(&mut self, buf: &[u8]) -> io::Result<usize> {
match self {
EitherOutput::A(a) => WriteEx::write2(a, buf).await,
EitherOutput::B(b) => WriteEx::write2(b, buf).await,
}
}
async fn flush2(&mut self) -> io::Result<()> {
match self {
EitherOutput::A(a) => WriteEx::flush2(a).await,
EitherOutput::B(b) => WriteEx::flush2(b).await,
}
}
async fn close2(&mut self) -> io::Result<()> {
match self {
EitherOutput::A(a) => WriteEx::close2(a).await,
EitherOutput::B(b) => WriteEx::close2(b).await,
}
}
}
pub enum EitherReaderWriter<A, B> {
A(A),
B(B),
}
#[async_trait]
impl<A, B> ReadEx for EitherReaderWriter<A, B>
where
A: ReadEx,
B: ReadEx,
{
async fn read2(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
match self {
EitherReaderWriter::A(a) => a.read2(buf).await,
EitherReaderWriter::B(b) => b.read2(buf).await,
}
}
}
#[async_trait]
impl<A, B> WriteEx for EitherReaderWriter<A, B>
where
A: WriteEx,
B: WriteEx,
{
async fn write2(&mut self, buf: &[u8]) -> Result<usize, Error> {
match self {
EitherReaderWriter::A(a) => a.write2(buf).await,
EitherReaderWriter::B(b) => b.write2(buf).await,
}
}
async fn flush2(&mut self) -> Result<(), Error> {
match self {
EitherReaderWriter::A(a) => a.flush2().await,
EitherReaderWriter::B(b) => b.flush2().await,
}
}
async fn close2(&mut self) -> Result<(), Error> {
match self {
EitherReaderWriter::A(a) => a.close2().await,
EitherReaderWriter::B(b) => b.close2().await,
}
}
}
impl<A, B> SplitEx for EitherOutput<A, B>
where
A: SplitEx,
B: SplitEx,
{
type Reader = EitherReaderWriter<A::Reader, B::Reader>;
type Writer = EitherReaderWriter<A::Writer, B::Writer>;
fn split(self) -> (Self::Reader, Self::Writer) {
match self {
EitherOutput::A(a) => {
let (r, w) = a.split();
(EitherReaderWriter::A(r), EitherReaderWriter::A(w))
}
EitherOutput::B(b) => {
let (r, w) = b.split();
(EitherReaderWriter::B(r), EitherReaderWriter::B(w))
}
}
}
}
impl<A, B> SecureInfo for EitherOutput<A, B>
where
A: SecureInfo,
B: SecureInfo,
{
fn local_peer(&self) -> PeerId {
match self {
EitherOutput::A(a) => a.local_peer(),
EitherOutput::B(b) => b.local_peer(),
}
}
fn remote_peer(&self) -> PeerId {
match self {
EitherOutput::A(a) => a.remote_peer(),
EitherOutput::B(b) => b.remote_peer(),
}
}
fn local_priv_key(&self) -> Keypair {
match self {
EitherOutput::A(a) => a.local_priv_key(),
EitherOutput::B(b) => b.local_priv_key(),
}
}
fn remote_pub_key(&self) -> PublicKey {
match self {
EitherOutput::A(a) => a.remote_pub_key(),
EitherOutput::B(b) => b.remote_pub_key(),
}
}
}
impl<A, B> StreamInfo for EitherOutput<A, B>
where
A: StreamInfo,
B: StreamInfo,
{
fn id(&self) -> usize {
match self {
EitherOutput::A(a) => a.id(),
EitherOutput::B(b) => b.id(),
}
}
}
#[async_trait]
impl<A, B> StreamMuxer for EitherOutput<A, B>
where
A: StreamMuxer + Send,
B: StreamMuxer + Send,
{
async fn open_stream(&mut self) -> Result<IReadWrite, TransportError> {
match self {
EitherOutput::A(a) => Ok(a.open_stream().await?),
EitherOutput::B(b) => Ok(b.open_stream().await?),
}
}
async fn accept_stream(&mut self) -> Result<IReadWrite, TransportError> {
match self {
EitherOutput::A(a) => Ok(a.accept_stream().await?),
EitherOutput::B(b) => Ok(b.accept_stream().await?),
}
}
async fn close(&mut self) -> Result<(), TransportError> {
match self {
EitherOutput::A(a) => a.close().await,
EitherOutput::B(b) => b.close().await,
}
}
fn task(&mut self) -> Option<BoxFuture<'static, ()>> {
match self {
EitherOutput::A(a) => a.task(),
EitherOutput::B(b) => b.task(),
}
}
fn box_clone(&self) -> IStreamMuxer {
match self {
EitherOutput::A(a) => a.box_clone(),
EitherOutput::B(b) => b.box_clone(),
}
}
}
impl<A, B> ConnectionInfo for EitherOutput<A, B>
where
A: ConnectionInfo,
B: ConnectionInfo,
{
fn local_multiaddr(&self) -> Multiaddr {
match self {
EitherOutput::A(a) => a.local_multiaddr(),
EitherOutput::B(b) => b.local_multiaddr(),
}
}
fn remote_multiaddr(&self) -> Multiaddr {
match self {
EitherOutput::A(a) => a.remote_multiaddr(),
EitherOutput::B(b) => b.remote_multiaddr(),
}
}
}
impl<A, B> StreamMuxerEx for EitherOutput<A, B>
where
A: StreamMuxer + ConnectionInfo + SecureInfo + std::fmt::Debug,
B: StreamMuxer + ConnectionInfo + SecureInfo + std::fmt::Debug,
{
}
#[derive(Debug, Clone)]
pub enum EitherName<A, B> {
A(A),
B(B),
}
impl<A: ProtocolName, B: ProtocolName> ProtocolName for EitherName<A, B> {
fn protocol_name(&self) -> &[u8] {
match self {
EitherName::A(a) => a.protocol_name(),
EitherName::B(b) => b.protocol_name(),
}
}
}