rust-ipfs 0.15.0

IPFS node implementation
Documentation
use std::io;
use std::iter;

use asynchronous_codec::{FramedRead, FramedWrite};
use connexa::prelude::transport::upgrade::InboundUpgrade;
use connexa::prelude::transport::upgrade::OutboundUpgrade;
use connexa::prelude::transport::upgrade::UpgradeInfo;
use connexa::prelude::StreamProtocol;
use futures::{future::BoxFuture, AsyncRead, AsyncWrite, SinkExt, StreamExt};

use super::{bitswap_pb, message::BitswapMessage};

const PROTOCOL: StreamProtocol = StreamProtocol::new("/ipfs/bitswap/1.2.0");
const MAX_BUF_SIZE: usize = 2_097_152;

#[derive(Debug, Clone, Default)]
pub struct BitswapProtocol;

impl UpgradeInfo for BitswapProtocol {
    type Info = StreamProtocol;
    type InfoIter = iter::Once<Self::Info>;

    fn protocol_info(&self) -> Self::InfoIter {
        iter::once(PROTOCOL)
    }
}

impl<TSocket> InboundUpgrade<TSocket> for BitswapProtocol
where
    TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
    type Output = BitswapMessage;
    type Error = io::Error;
    type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;

    fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
        Box::pin(async move {
            let mut framed = FramedRead::new(
                socket,
                quick_protobuf_codec::Codec::<bitswap_pb::Message>::new(MAX_BUF_SIZE),
            );

            let message = framed
                .next()
                .await
                .ok_or_else(|| std::io::Error::from(io::ErrorKind::UnexpectedEof))??;

            let message = BitswapMessage::from_proto(message).map_err(|e| {
                tracing::error!(error = %e, "unable to parse bitswap message");
                e
            })?;

            Ok(message)
        })
    }
}

impl UpgradeInfo for BitswapMessage {
    type Info = StreamProtocol;
    type InfoIter = iter::Once<Self::Info>;

    fn protocol_info(&self) -> Self::InfoIter {
        iter::once(PROTOCOL)
    }
}

impl<TSocket> OutboundUpgrade<TSocket> for BitswapMessage
where
    TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
    type Output = ();
    type Error = io::Error;
    type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;

    #[inline]
    fn upgrade_outbound(self, socket: TSocket, _info: Self::Info) -> Self::Future {
        Box::pin(async move {
            let mut framed = FramedWrite::new(
                socket,
                quick_protobuf_codec::Codec::<bitswap_pb::Message>::new(MAX_BUF_SIZE),
            );

            let message = self.into_proto()?;

            framed.send(message).await?;
            framed.close().await?;
            Ok(())
        })
    }
}

#[derive(Debug)]
pub enum Message {
    Receive { message: BitswapMessage },
    Sent,
}

impl From<BitswapMessage> for Message {
    #[inline]
    fn from(message: BitswapMessage) -> Self {
        Message::Receive { message }
    }
}

impl From<()> for Message {
    #[inline]
    fn from(_: ()) -> Self {
        Message::Sent
    }
}