1use std::io::Result;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use futures::{StreamExt, TryStreamExt};
9
10use rasi::net::{TcpListener, TcpStream};
11
12use xstack::multiaddr::{is_tcp_transport, Multiaddr, Protocol, ToSockAddr};
13use xstack::transport_syscall::DriverTransport;
14use xstack::Switch;
15use xstack::{P2pConn, TransportListener};
16use xstack_tls::{TlsConn, TlsListener};
17
18#[derive(Default)]
20pub struct TcpTransport {
21 actives: Arc<AtomicUsize>,
22}
23
24#[async_trait]
25impl DriverTransport for TcpTransport {
26 fn name(&self) -> &str {
27 "tcp"
28 }
29 fn activities(&self) -> usize {
30 self.actives.load(Ordering::Relaxed)
31 }
32 async fn bind(&self, switch: &Switch, laddr: &Multiaddr) -> Result<TransportListener> {
33 let listener = TcpListener::bind(laddr.to_sockaddr()?).await?;
34
35 let local_addr = listener.local_addr()?;
36
37 let local_addr = Multiaddr::from(local_addr.ip())
38 .with(Protocol::Tcp(local_addr.port()))
39 .with(Protocol::Tls);
40
41 let incoming = listener.into_stream().filter_map(|stream| async move {
42 match stream {
43 Ok(stream) => match stream.peer_addr() {
44 Ok(peer_addr) => {
45 let peer_addr = Multiaddr::from(peer_addr.ip())
46 .with(Protocol::Tcp(peer_addr.port()))
47 .with(Protocol::Tls);
48
49 return Some(Ok((stream, peer_addr)));
50 }
51 Err(_) => {
52 return None;
53 }
54 },
55 Err(err) => return Some(Err(err)),
56 }
57 });
58
59 Ok(TlsListener::new(
60 &switch,
61 local_addr,
62 Box::pin(incoming),
63 self.actives.clone(),
64 )
65 .await?
66 .into())
67 }
68
69 async fn connect(&self, switch: &Switch, raddr: &Multiaddr) -> Result<P2pConn> {
71 let stream = TcpStream::connect(raddr.to_sockaddr()?).await?;
72
73 let local_addr = stream.local_addr()?;
74
75 let local_addr = Multiaddr::from(local_addr.ip())
76 .with(Protocol::Tcp(local_addr.port()))
77 .with(Protocol::Tls);
78
79 let conn = TlsConn::connect(
80 &switch,
81 stream,
82 local_addr,
83 raddr.clone(),
84 self.actives.clone(),
85 )
86 .await?;
87
88 Ok(conn.into())
89 }
90
91 fn multiaddr_hint(&self, addr: &Multiaddr) -> bool {
93 is_tcp_transport(addr)
94 }
95}
96
97#[cfg(test)]
98mod tests {
99 use async_trait::async_trait;
100 use xstack::{Result, Switch};
101 use xstack_spec::transport::{transport_specs, TransportSpecContext};
102
103 use super::*;
104
105 struct TcpMock;
106
107 #[async_trait]
108 impl TransportSpecContext for TcpMock {
109 async fn create_switch(&self) -> Result<Switch> {
110 let switch = Switch::new("test")
111 .transport(TcpTransport::default())
112 .transport_bind(["/ip4/127.0.0.1/tcp/0"])
113 .create()
114 .await?;
115
116 Ok(switch)
117 }
118 }
119
120 #[futures_test::test]
121 async fn test_specs() {
122 transport_specs(TcpMock).await.unwrap();
124 }
125}