use std::{io::Result, pin::Pin};
use futures::{stream::unfold, AsyncRead, AsyncWrite};
use libp2p_identity::PeerId;
use multiaddr::Multiaddr;
use multistream_select::{dialer_select_proto, Version};
use crate::{driver_wrapper, switch::Switch, XStackRpc, PROTOCOL_IPFS_PING};
pub mod transport_syscall {
use std::{
io::Result,
task::{Context, Poll},
};
use async_trait::async_trait;
use libp2p_identity::PublicKey;
use multiaddr::Multiaddr;
use crate::switch::Switch;
use super::*;
#[async_trait]
pub trait DriverTransport: Send + Sync {
async fn bind(&self, switch: &Switch, laddr: &Multiaddr) -> Result<TransportListener>;
async fn connect(&self, switch: &Switch, raddr: &Multiaddr) -> Result<P2pConn>;
fn multiaddr_hint(&self, addr: &Multiaddr) -> bool;
fn activities(&self) -> usize;
fn name(&self) -> &str;
}
#[async_trait]
pub trait DriverListener: Sync + Sync {
async fn accept(&mut self) -> Result<P2pConn>;
fn local_addr(&self) -> Result<Multiaddr>;
}
#[async_trait]
pub trait DriverConnection: Send + Sync + Unpin {
fn id(&self) -> &str;
fn public_key(&self) -> &PublicKey;
fn local_addr(&self) -> &Multiaddr;
fn peer_addr(&self) -> &Multiaddr;
async fn accept(&mut self) -> Result<super::ProtocolStream>;
async fn connect(&mut self) -> Result<super::ProtocolStream>;
fn close(&mut self) -> Result<()>;
fn is_closed(&self) -> bool;
fn clone(&self) -> P2pConn;
fn actives(&self) -> usize;
}
pub trait DriverStream: Sync + Send + Unpin {
fn conn_id(&self) -> &str;
fn id(&self) -> &str;
fn public_key(&self) -> &PublicKey;
fn local_addr(&self) -> &Multiaddr;
fn peer_addr(&self) -> &Multiaddr;
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>>;
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize>>;
fn poll_flush(self: std::pin::Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>>;
fn poll_close(self: std::pin::Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>>;
}
}
pub enum ConnectTo<'a> {
PeerIdRef(&'a PeerId),
MultiaddrRef(&'a Multiaddr),
MultiaddrsRef(&'a [Multiaddr]),
PeerId(PeerId),
Multiaddr(Multiaddr),
Multiaddrs(Vec<Multiaddr>),
}
impl<'a> From<&'a PeerId> for ConnectTo<'a> {
fn from(value: &'a PeerId) -> Self {
Self::PeerIdRef(value)
}
}
impl<'a> From<&'a Multiaddr> for ConnectTo<'a> {
fn from(value: &'a Multiaddr) -> Self {
Self::MultiaddrRef(value)
}
}
impl<'a> From<&'a [Multiaddr]> for ConnectTo<'a> {
fn from(value: &'a [Multiaddr]) -> Self {
Self::MultiaddrsRef(value)
}
}
impl From<PeerId> for ConnectTo<'static> {
fn from(value: PeerId) -> Self {
Self::PeerId(value)
}
}
impl From<Multiaddr> for ConnectTo<'static> {
fn from(value: Multiaddr) -> Self {
Self::Multiaddr(value)
}
}
impl From<Vec<Multiaddr>> for ConnectTo<'static> {
fn from(value: Vec<Multiaddr>) -> Self {
Self::Multiaddrs(value)
}
}
impl TryFrom<&str> for ConnectTo<'static> {
type Error = crate::Error;
fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
if let Ok(peer_id) = value.parse::<PeerId>() {
return Ok(Self::PeerId(peer_id));
}
return Ok(Self::Multiaddr(value.parse::<Multiaddr>()?));
}
}
driver_wrapper!(
["A type wrapper of [`DriverTransport`](transport_syscall::DriverTransport)"]
Transport[transport_syscall::DriverTransport]
);
driver_wrapper!(
["A type wrapper of [`DriverListener`](transport_syscall::DriverListener)"]
TransportListener[transport_syscall::DriverListener]
);
impl TransportListener {
pub fn into_incoming(self) -> impl futures::Stream<Item = Result<P2pConn>> + Unpin {
Box::pin(unfold(self, |mut listener| async move {
let res = listener.accept().await;
Some((res, listener))
}))
}
}
driver_wrapper!(
["A type wrapper of [`DriverConnection`](transport_syscall::DriverConnection)"]
P2pConn[transport_syscall::DriverConnection]
);
impl P2pConn {
pub fn clone(&self) -> P2pConn {
self.0.clone()
}
pub async fn connect<I>(&mut self, protocols: I) -> Result<(ProtocolStream, I::Item)>
where
I: IntoIterator,
I::Item: AsRef<str>,
{
let mut stream = self.as_driver().connect().await?;
let (id, _) = dialer_select_proto(&mut stream, protocols, Version::V1).await?;
Ok((stream, id))
}
pub async fn ping(&mut self) -> Result<()> {
let (stream, _) = self.connect([PROTOCOL_IPFS_PING]).await?;
Ok(stream.xstack_ping().await?)
}
}
driver_wrapper!(
["A type wrapper of [`DriverStream`](transport_syscall::DriverStream)"]
ProtocolStream[transport_syscall::DriverStream]
);
#[cfg(feature = "global_register")]
#[cfg_attr(docsrs, doc(cfg(feature = "global_register")))]
impl ProtocolStream {
pub async fn connect<'a, C, E, I>(
target: C,
protos: I,
) -> crate::Result<(ProtocolStream, I::Item)>
where
C: TryInto<ConnectTo<'a>, Error = E>,
I: IntoIterator,
I::Item: AsRef<str>,
E: std::fmt::Debug,
{
Self::connect_with(crate::global_switch(), target, protos).await
}
pub async fn ping<'a, C, E>(target: C) -> crate::Result<()>
where
C: TryInto<ConnectTo<'a>, Error = E>,
E: std::fmt::Debug,
{
Self::ping_with(crate::global_switch(), target).await
}
}
impl ProtocolStream {
pub async fn connect_with<'a, C, E, I>(
switch: &Switch,
target: C,
protos: I,
) -> crate::Result<(ProtocolStream, I::Item)>
where
C: TryInto<ConnectTo<'a>, Error = E>,
I: IntoIterator,
I::Item: AsRef<str>,
E: std::fmt::Debug,
{
switch.connect(target, protos).await
}
pub async fn ping_with<'a, C, E>(switch: &Switch, target: C) -> crate::Result<()>
where
C: TryInto<ConnectTo<'a>, Error = E>,
E: std::fmt::Debug,
{
let (stream, _) = Self::connect_with(switch, target, [PROTOCOL_IPFS_PING]).await?;
stream.xstack_ping().await
}
}
impl AsyncWrite for ProtocolStream {
fn poll_write(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
Pin::new(self.as_driver()).poll_write(cx, buf)
}
fn poll_flush(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
Pin::new(self.as_driver()).poll_flush(cx)
}
fn poll_close(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
Pin::new(self.as_driver()).poll_close(cx)
}
}
impl AsyncRead for ProtocolStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<std::io::Result<usize>> {
Pin::new(self.as_driver()).poll_read(cx, buf)
}
}