airio_tcp/
lib.rs

1mod listener;
2mod stream;
3
4use std::{io, net::SocketAddr};
5
6use airio_core::Transport;
7use futures::{
8    FutureExt, TryFutureExt,
9    future::{BoxFuture, Ready},
10};
11
12pub use listener::ListenStream;
13use socket2::{Domain, Protocol, Socket, Type};
14pub use stream::TcpStream;
15use tokio::net::TcpListener;
16
17#[derive(Clone, Debug)]
18pub struct Config {
19    ttl: Option<u32>,
20    nodelay: bool,
21    backlog: u32,
22}
23
24impl Config {
25    pub fn new() -> Self {
26        Self {
27            ttl: None,
28            nodelay: true,
29            backlog: 1024,
30        }
31    }
32
33    pub fn ttl(mut self, value: u32) -> Self {
34        self.ttl = Some(value);
35        self
36    }
37
38    pub fn nodelay(mut self, value: bool) -> Self {
39        self.nodelay = value;
40        self
41    }
42
43    pub fn listen_backlog(mut self, backlog: u32) -> Self {
44        self.backlog = backlog;
45        self
46    }
47
48    fn create_socket(&self, socket_addr: SocketAddr) -> io::Result<Socket> {
49        let socket = Socket::new(
50            Domain::for_address(socket_addr),
51            Type::STREAM,
52            Some(Protocol::TCP),
53        )?;
54        if socket_addr.is_ipv6() {
55            socket.set_only_v6(true)?;
56        }
57        if let Some(ttl) = self.ttl {
58            socket.set_ttl(ttl)?;
59        }
60        socket.set_nodelay(self.nodelay)?;
61        socket.set_reuse_address(true)?;
62        socket.set_nonblocking(true)?;
63        Ok(socket)
64    }
65}
66
67impl Default for Config {
68    fn default() -> Self {
69        Self::new()
70    }
71}
72
73impl Transport for Config {
74    type Output = TcpStream;
75    type Error = io::Error;
76    type Dialer = BoxFuture<'static, Result<Self::Output, Self::Error>>;
77    type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
78    type Listener = ListenStream;
79
80    fn connect(&self, addr: SocketAddr) -> Result<Self::Dialer, Self::Error> {
81        let fut = tokio::net::TcpStream::connect(addr)
82            .map_ok(TcpStream::from)
83            .boxed();
84        Ok(fut)
85    }
86
87    fn listen(&self, addr: SocketAddr) -> Result<Self::Listener, Self::Error> {
88        let socket = self.create_socket(addr)?;
89        socket.bind(&addr.into())?;
90        socket.listen(self.backlog as _)?;
91        socket.set_nonblocking(true)?;
92        let listener = TcpListener::from_std(socket.into())?;
93        Ok(ListenStream::new(listener, addr))
94    }
95}