xstack_tcp/
lib.rs

1//! A [***libp2p TCP transport protocol with TLS encryption***](https://docs.libp2p.io/concepts/secure-comm/tls/) implementation.
2
3use std::io::Result;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use futures::{StreamExt, TryStreamExt};
9
10use rasi::net::{TcpListener, TcpStream};
11
12use xstack::multiaddr::{is_tcp_transport, Multiaddr, Protocol, ToSockAddr};
13use xstack::transport_syscall::DriverTransport;
14use xstack::Switch;
15use xstack::{P2pConn, TransportListener};
16use xstack_tls::{TlsConn, TlsListener};
17
18/// The libp2p tcp transport implementation.
19#[derive(Default)]
20pub struct TcpTransport {
21    actives: Arc<AtomicUsize>,
22}
23
24#[async_trait]
25impl DriverTransport for TcpTransport {
26    fn name(&self) -> &str {
27        "tcp"
28    }
29    fn activities(&self) -> usize {
30        self.actives.load(Ordering::Relaxed)
31    }
32    async fn bind(&self, switch: &Switch, laddr: &Multiaddr) -> Result<TransportListener> {
33        let listener = TcpListener::bind(laddr.to_sockaddr()?).await?;
34
35        let local_addr = listener.local_addr()?;
36
37        let local_addr = Multiaddr::from(local_addr.ip())
38            .with(Protocol::Tcp(local_addr.port()))
39            .with(Protocol::Tls);
40
41        let incoming = listener.into_stream().filter_map(|stream| async move {
42            match stream {
43                Ok(stream) => match stream.peer_addr() {
44                    Ok(peer_addr) => {
45                        let peer_addr = Multiaddr::from(peer_addr.ip())
46                            .with(Protocol::Tcp(peer_addr.port()))
47                            .with(Protocol::Tls);
48
49                        return Some(Ok((stream, peer_addr)));
50                    }
51                    Err(_) => {
52                        return None;
53                    }
54                },
55                Err(err) => return Some(Err(err)),
56            }
57        });
58
59        Ok(TlsListener::new(
60            &switch,
61            local_addr,
62            Box::pin(incoming),
63            self.actives.clone(),
64        )
65        .await?
66        .into())
67    }
68
69    /// Connect to peer with remote peer [`raddr`](Multiaddr).
70    async fn connect(&self, switch: &Switch, raddr: &Multiaddr) -> Result<P2pConn> {
71        let stream = TcpStream::connect(raddr.to_sockaddr()?).await?;
72
73        let local_addr = stream.local_addr()?;
74
75        let local_addr = Multiaddr::from(local_addr.ip())
76            .with(Protocol::Tcp(local_addr.port()))
77            .with(Protocol::Tls);
78
79        let conn = TlsConn::connect(
80            &switch,
81            stream,
82            local_addr,
83            raddr.clone(),
84            self.actives.clone(),
85        )
86        .await?;
87
88        Ok(conn.into())
89    }
90
91    /// Check if this transport support the protocol stack represented by the `addr`.
92    fn multiaddr_hint(&self, addr: &Multiaddr) -> bool {
93        is_tcp_transport(addr)
94    }
95}
96
97#[cfg(test)]
98mod tests {
99    use async_trait::async_trait;
100    use xstack::{Result, Switch};
101    use xstack_spec::transport::{transport_specs, TransportSpecContext};
102
103    use super::*;
104
105    struct TcpMock;
106
107    #[async_trait]
108    impl TransportSpecContext for TcpMock {
109        async fn create_switch(&self) -> Result<Switch> {
110            let switch = Switch::new("test")
111                .transport(TcpTransport::default())
112                .transport_bind(["/ip4/127.0.0.1/tcp/0"])
113                .create()
114                .await?;
115
116            Ok(switch)
117        }
118    }
119
120    #[futures_test::test]
121    async fn test_specs() {
122        // pretty_env_logger::init();
123        transport_specs(TcpMock).await.unwrap();
124    }
125}