use std::io;
use std::iter;
use asynchronous_codec::{FramedRead, FramedWrite};
use connexa::prelude::transport::upgrade::InboundUpgrade;
use connexa::prelude::transport::upgrade::OutboundUpgrade;
use connexa::prelude::transport::upgrade::UpgradeInfo;
use connexa::prelude::StreamProtocol;
use futures::{future::BoxFuture, AsyncRead, AsyncWrite, SinkExt, StreamExt};
use super::{bitswap_pb, message::BitswapMessage};
const PROTOCOL: StreamProtocol = StreamProtocol::new("/ipfs/bitswap/1.2.0");
const MAX_BUF_SIZE: usize = 2_097_152;
#[derive(Debug, Clone, Default)]
pub struct BitswapProtocol;
impl UpgradeInfo for BitswapProtocol {
type Info = StreamProtocol;
type InfoIter = iter::Once<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
iter::once(PROTOCOL)
}
}
impl<TSocket> InboundUpgrade<TSocket> for BitswapProtocol
where
TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
type Output = BitswapMessage;
type Error = io::Error;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
Box::pin(async move {
let mut framed = FramedRead::new(
socket,
quick_protobuf_codec::Codec::<bitswap_pb::Message>::new(MAX_BUF_SIZE),
);
let message = framed
.next()
.await
.ok_or_else(|| std::io::Error::from(io::ErrorKind::UnexpectedEof))??;
let message = BitswapMessage::from_proto(message).map_err(|e| {
tracing::error!(error = %e, "unable to parse bitswap message");
e
})?;
Ok(message)
})
}
}
impl UpgradeInfo for BitswapMessage {
type Info = StreamProtocol;
type InfoIter = iter::Once<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
iter::once(PROTOCOL)
}
}
impl<TSocket> OutboundUpgrade<TSocket> for BitswapMessage
where
TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
type Output = ();
type Error = io::Error;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
#[inline]
fn upgrade_outbound(self, socket: TSocket, _info: Self::Info) -> Self::Future {
Box::pin(async move {
let mut framed = FramedWrite::new(
socket,
quick_protobuf_codec::Codec::<bitswap_pb::Message>::new(MAX_BUF_SIZE),
);
let message = self.into_proto()?;
framed.send(message).await?;
framed.close().await?;
Ok(())
})
}
}
#[derive(Debug)]
pub enum Message {
Receive { message: BitswapMessage },
Sent,
}
impl From<BitswapMessage> for Message {
#[inline]
fn from(message: BitswapMessage) -> Self {
Message::Receive { message }
}
}
impl From<()> for Message {
#[inline]
fn from(_: ()) -> Self {
Message::Sent
}
}