cloudpub_common/transport/
mod.rs

1use crate::config::TransportConfig;
2use crate::utils::to_socket_addr;
3use anyhow::{Context, Result};
4use async_trait::async_trait;
5use std::fmt::{Debug, Display};
6#[cfg(unix)]
7use std::os::fd::RawFd;
8use std::time::Duration;
9use tokio::io::{AsyncRead, AsyncWrite};
10use tracing::error;
11
12use crate::protocol::message::Message as ProtocolMessage;
13
14#[async_trait]
15pub trait ProtobufStream {
16    async fn recv_message(&mut self) -> anyhow::Result<Option<ProtocolMessage>>;
17    async fn send_message(&mut self, msg: &ProtocolMessage) -> anyhow::Result<()>;
18    async fn close(&mut self) -> anyhow::Result<()>;
19}
20
21#[cfg(unix)]
22use anyhow::bail;
23
24mod tcp;
25pub use tcp::{Listener, NamedSocketAddr, SocketAddr, Stream, TcpTransport};
26
27mod websocket;
28pub use websocket::{WebsocketStream, WebsocketTransport};
29
30#[cfg(feature = "rustls")]
31pub mod rustls;
32#[cfg(feature = "rustls")]
33use rustls as tls;
34#[cfg(feature = "rustls")]
35pub use tls::TlsTransport;
36
37#[derive(Clone)]
38pub struct AddrMaybeCached {
39    pub addr: String,
40    pub socket_addr: Option<NamedSocketAddr>,
41}
42
43impl AddrMaybeCached {
44    pub fn new(addr: &str) -> AddrMaybeCached {
45        AddrMaybeCached {
46            addr: addr.to_string(),
47            socket_addr: None,
48        }
49    }
50
51    pub async fn resolve(&mut self) -> Result<()> {
52        match to_socket_addr(&self.addr).await {
53            Ok(s) => {
54                self.socket_addr = Some(NamedSocketAddr::Inet(s));
55                Ok(())
56            }
57            Err(e) => Err(e),
58        }
59    }
60}
61
62impl Display for AddrMaybeCached {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        match self.socket_addr.as_ref() {
65            Some(s) => f.write_fmt(format_args!("{}", s)),
66            None => f.write_str(&self.addr),
67        }
68    }
69}
70
71/// Specify a transport layer, like TCP, TLS
72#[async_trait]
73pub trait Transport: Debug + Send + Sync {
74    type Acceptor: Send + Sync;
75    type RawStream: Send + Sync;
76    type Stream: 'static + AsyncRead + AsyncWrite + ProtobufStream + Unpin + Send + Sync + Debug;
77
78    fn new(config: &TransportConfig) -> Result<Self>
79    where
80        Self: Sized;
81    /// Get the stream id, which is used to identify the transport layer
82    #[cfg(unix)]
83    fn as_raw_fd(conn: &Self::Stream) -> RawFd;
84    /// Provide the transport with socket options, which can be handled at the need of the transport
85    fn hint(conn: &Self::Stream, opts: SocketOpts);
86    async fn bind(&self, addr: NamedSocketAddr) -> Result<Self::Acceptor>;
87    /// accept must be cancel safe
88    async fn accept(&self, a: &Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)>;
89    async fn handshake(&self, conn: Self::RawStream) -> Result<Self::Stream>;
90    async fn connect(&self, addr: &AddrMaybeCached) -> Result<Self::Stream>;
91
92    fn get_header(&self, _name: &str) -> Option<String> {
93        None
94    }
95}
96
97#[derive(Debug, Clone, Copy)]
98pub struct Keepalive {
99    // tcp_keepalive_time if the underlying protocol is TCP
100    pub keepalive_secs: u64,
101    // tcp_keepalive_intvl if the underlying protocol is TCP
102    pub keepalive_interval: u64,
103}
104
105#[derive(Debug, Clone, Copy)]
106pub struct SocketOpts {
107    // None means do not change
108    pub nodelay: Option<bool>,
109    // keepalive must be Some or None at the same time, or the behavior will be platform-dependent
110    pub keepalive: Option<Keepalive>,
111    // SO_PRIORITY
112    pub priority: Option<u8>,
113}
114
115impl SocketOpts {
116    /// Socket options for the control channel
117    pub fn for_control_channel() -> SocketOpts {
118        SocketOpts {
119            nodelay: Some(true), // Always set nodelay for the control channel
120            keepalive: None,
121            priority: Some(0), // Set high priority for the control channel
122        }
123    }
124
125    pub fn for_data_channel() -> SocketOpts {
126        SocketOpts {
127            nodelay: Some(true), // Always set nodelay for the data channel
128            keepalive: None,
129            priority: Some(0),
130        }
131    }
132}
133
134#[cfg(unix)]
135pub fn set_reuse(s: &dyn std::os::fd::AsRawFd) -> Result<()> {
136    use libc;
137    use std::{io, mem};
138    unsafe {
139        let optval: libc::c_int = 1;
140        let ret = libc::setsockopt(
141            s.as_raw_fd(),
142            libc::SOL_SOCKET,
143            libc::SO_REUSEPORT | libc::SO_REUSEADDR,
144            &optval as *const _ as *const libc::c_void,
145            mem::size_of_val(&optval) as libc::socklen_t,
146        );
147        if ret != 0 {
148            bail!("Set sock option failed: {:?}", io::Error::last_os_error());
149        }
150    }
151    Ok(())
152}
153
154#[cfg(target_os = "linux")]
155pub fn set_low_latency(s: &dyn std::os::fd::AsRawFd) -> Result<()> {
156    use libc;
157    use std::{io, mem};
158
159    unsafe {
160        let fd = s.as_raw_fd();
161        // 1. TCP_NODELAY - Disable Nagle's algorithm for immediate packet sending
162        let nodelay: libc::c_int = 1;
163        let ret = libc::setsockopt(
164            fd,
165            libc::IPPROTO_TCP,
166            libc::TCP_NODELAY,
167            &nodelay as *const _ as *const libc::c_void,
168            mem::size_of_val(&nodelay) as libc::socklen_t,
169        );
170        if ret != 0 {
171            bail!(
172                "Failed to set TCP_NODELAY: {:?}",
173                io::Error::last_os_error()
174            );
175        }
176
177        // 2. TCP_QUICKACK - Enable quick ACK mode to reduce ACK delay
178        let quickack: libc::c_int = 1;
179        let ret = libc::setsockopt(
180            fd,
181            libc::IPPROTO_TCP,
182            libc::TCP_QUICKACK,
183            &quickack as *const _ as *const libc::c_void,
184            mem::size_of_val(&quickack) as libc::socklen_t,
185        );
186        if ret != 0 {
187            bail!(
188                "Failed to set TCP_QUICKACK: {:?}",
189                io::Error::last_os_error()
190            );
191        }
192    }
193    Ok(())
194}
195
196// Set socket priority: 0 - lowest (default), 7 - higest
197#[cfg(target_os = "linux")]
198pub fn set_priority(s: &dyn std::os::fd::AsRawFd, priority: libc::c_int) -> Result<()> {
199    use libc;
200    use std::{io, mem};
201
202    unsafe {
203        let fd = s.as_raw_fd();
204
205        // 1. SO_PRIORITY - Set high priority for the socket
206        let ret = libc::setsockopt(
207            fd,
208            libc::SOL_SOCKET,
209            libc::SO_PRIORITY,
210            &priority as *const _ as *const libc::c_void,
211            mem::size_of_val(&priority) as libc::socklen_t,
212        );
213        if ret != 0 {
214            bail!(
215                "Failed to set SO_PRIORITY: {:?}",
216                io::Error::last_os_error()
217            );
218        }
219    }
220
221    Ok(())
222}
223
224impl SocketOpts {
225    pub fn apply(&self, conn: &Stream) {
226        if let Some(v) = self.keepalive {
227            let keepalive_duration = Duration::from_secs(v.keepalive_secs);
228            let keepalive_interval = Duration::from_secs(v.keepalive_interval);
229            if let Err(e) = tcp::try_set_tcp_keepalive(conn, keepalive_duration, keepalive_interval)
230                .with_context(|| "Failed to set keepalive")
231            {
232                error!("{:#}", e);
233            }
234        }
235
236        match conn {
237            Stream::Tcp(conn) => {
238                #[cfg(unix)]
239                if let Err(e) = set_reuse(conn) {
240                    error!("{:#}", e);
241                }
242                if let Some(nodelay) = self.nodelay {
243                    #[cfg(not(target_os = "linux"))]
244                    if let Err(e) = conn
245                        .set_nodelay(nodelay)
246                        .with_context(|| "Failed to set nodelay")
247                    {
248                        error!("{:#}", e);
249                    }
250                    #[cfg(target_os = "linux")]
251                    if nodelay {
252                        if let Err(e) = set_low_latency(conn) {
253                            error!("Failed to set low latency options: {:#}", e);
254                        }
255                    }
256                }
257                #[cfg(target_os = "linux")]
258                if let Some(priority) = self.priority {
259                    if let Err(e) = set_priority(conn, priority as libc::c_int) {
260                        error!("Failed to set socket priority: {:#}", e);
261                    }
262                }
263            }
264            #[cfg(unix)]
265            Stream::Unix(_conn) =>
266            {
267                #[cfg(target_os = "linux")]
268                if let Some(priority) = self.priority {
269                    if let Err(e) = set_priority(_conn, priority as libc::c_int) {
270                        error!("Failed to set socket priority: {:#}", e);
271                    }
272                }
273            }
274        }
275    }
276}