use std::io::Result;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use rasi::net::{TcpListener, TcpStream};
use xstack::multiaddr::{is_tcp_transport, Multiaddr, Protocol, ToSockAddr};
use xstack::transport_syscall::DriverTransport;
use xstack::Switch;
use xstack::{P2pConn, TransportListener};
use xstack_tls::{TlsConn, TlsListener};
#[derive(Default)]
pub struct TcpTransport {
actives: Arc<AtomicUsize>,
}
#[async_trait]
impl DriverTransport for TcpTransport {
fn name(&self) -> &str {
"tcp"
}
fn activities(&self) -> usize {
self.actives.load(Ordering::Relaxed)
}
async fn bind(&self, switch: &Switch, laddr: &Multiaddr) -> Result<TransportListener> {
let listener = TcpListener::bind(laddr.to_sockaddr()?).await?;
let local_addr = listener.local_addr()?;
let local_addr = Multiaddr::from(local_addr.ip())
.with(Protocol::Tcp(local_addr.port()))
.with(Protocol::Tls);
let incoming = listener.into_stream().filter_map(|stream| async move {
match stream {
Ok(stream) => match stream.peer_addr() {
Ok(peer_addr) => {
let peer_addr = Multiaddr::from(peer_addr.ip())
.with(Protocol::Tcp(peer_addr.port()))
.with(Protocol::Tls);
return Some(Ok((stream, peer_addr)));
}
Err(_) => {
return None;
}
},
Err(err) => return Some(Err(err)),
}
});
Ok(TlsListener::new(
&switch,
local_addr,
Box::pin(incoming),
self.actives.clone(),
)
.await?
.into())
}
async fn connect(&self, switch: &Switch, raddr: &Multiaddr) -> Result<P2pConn> {
let stream = TcpStream::connect(raddr.to_sockaddr()?).await?;
let local_addr = stream.local_addr()?;
let local_addr = Multiaddr::from(local_addr.ip())
.with(Protocol::Tcp(local_addr.port()))
.with(Protocol::Tls);
let conn = TlsConn::connect(
&switch,
stream,
local_addr,
raddr.clone(),
self.actives.clone(),
)
.await?;
Ok(conn.into())
}
fn multiaddr_hint(&self, addr: &Multiaddr) -> bool {
is_tcp_transport(addr)
}
}
#[cfg(test)]
mod tests {
use async_trait::async_trait;
use xstack::{Result, Switch};
use xstack_spec::transport::{transport_specs, TransportSpecContext};
use super::*;
struct TcpMock;
#[async_trait]
impl TransportSpecContext for TcpMock {
async fn create_switch(&self) -> Result<Switch> {
let switch = Switch::new("test")
.transport(TcpTransport::default())
.transport_bind(["/ip4/127.0.0.1/tcp/0"])
.create()
.await?;
Ok(switch)
}
}
#[futures_test::test]
async fn test_specs() {
transport_specs(TcpMock).await.unwrap();
}
}