cloudpub_common/transport/
mod.rs1use crate::config::TransportConfig;
2use crate::utils::to_socket_addr;
3use anyhow::{Context, Result};
4use async_trait::async_trait;
5use std::fmt::{Debug, Display};
6#[cfg(unix)]
7use std::os::fd::RawFd;
8use std::time::Duration;
9use tokio::io::{AsyncRead, AsyncWrite};
10use tracing::error;
11
12use crate::protocol::message::Message as ProtocolMessage;
13
14#[async_trait]
15pub trait ProtobufStream {
16 async fn recv_message(&mut self) -> anyhow::Result<Option<ProtocolMessage>>;
17 async fn send_message(&mut self, msg: &ProtocolMessage) -> anyhow::Result<()>;
18 async fn close(&mut self) -> anyhow::Result<()>;
19}
20
21#[cfg(unix)]
22use anyhow::bail;
23
24mod tcp;
25pub use tcp::{Listener, NamedSocketAddr, SocketAddr, Stream, TcpTransport};
26
27mod websocket;
28pub use websocket::{WebsocketStream, WebsocketTransport};
29
30#[cfg(feature = "rustls")]
31pub mod rustls;
32#[cfg(feature = "rustls")]
33use rustls as tls;
34#[cfg(feature = "rustls")]
35pub use tls::TlsTransport;
36
37#[derive(Clone)]
38pub struct AddrMaybeCached {
39 pub addr: String,
40 pub socket_addr: Option<NamedSocketAddr>,
41}
42
43impl AddrMaybeCached {
44 pub fn new(addr: &str) -> AddrMaybeCached {
45 AddrMaybeCached {
46 addr: addr.to_string(),
47 socket_addr: None,
48 }
49 }
50
51 pub async fn resolve(&mut self) -> Result<()> {
52 match to_socket_addr(&self.addr).await {
53 Ok(s) => {
54 self.socket_addr = Some(NamedSocketAddr::Inet(s));
55 Ok(())
56 }
57 Err(e) => Err(e),
58 }
59 }
60}
61
62impl Display for AddrMaybeCached {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 match self.socket_addr.as_ref() {
65 Some(s) => f.write_fmt(format_args!("{}", s)),
66 None => f.write_str(&self.addr),
67 }
68 }
69}
70
71#[async_trait]
73pub trait Transport: Debug + Send + Sync {
74 type Acceptor: Send + Sync;
75 type RawStream: Send + Sync;
76 type Stream: 'static + AsyncRead + AsyncWrite + ProtobufStream + Unpin + Send + Sync + Debug;
77
78 fn new(config: &TransportConfig) -> Result<Self>
79 where
80 Self: Sized;
81 #[cfg(unix)]
83 fn as_raw_fd(conn: &Self::Stream) -> RawFd;
84 fn hint(conn: &Self::Stream, opts: SocketOpts);
86 async fn bind(&self, addr: NamedSocketAddr) -> Result<Self::Acceptor>;
87 async fn accept(&self, a: &Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)>;
89 async fn handshake(&self, conn: Self::RawStream) -> Result<Self::Stream>;
90 async fn connect(&self, addr: &AddrMaybeCached) -> Result<Self::Stream>;
91
92 fn get_header(&self, _name: &str) -> Option<String> {
93 None
94 }
95}
96
97#[derive(Debug, Clone, Copy)]
98pub struct Keepalive {
99 pub keepalive_secs: u64,
101 pub keepalive_interval: u64,
103}
104
105#[derive(Debug, Clone, Copy)]
106pub struct SocketOpts {
107 pub nodelay: Option<bool>,
109 pub keepalive: Option<Keepalive>,
111 pub priority: Option<u8>,
113}
114
115impl SocketOpts {
116 pub fn for_control_channel() -> SocketOpts {
118 SocketOpts {
119 nodelay: Some(true), keepalive: None,
121 priority: Some(0), }
123 }
124
125 pub fn for_data_channel() -> SocketOpts {
126 SocketOpts {
127 nodelay: Some(true), keepalive: None,
129 priority: Some(0),
130 }
131 }
132}
133
134#[cfg(unix)]
135pub fn set_reuse(s: &dyn std::os::fd::AsRawFd) -> Result<()> {
136 use libc;
137 use std::{io, mem};
138 unsafe {
139 let optval: libc::c_int = 1;
140 let ret = libc::setsockopt(
141 s.as_raw_fd(),
142 libc::SOL_SOCKET,
143 libc::SO_REUSEPORT | libc::SO_REUSEADDR,
144 &optval as *const _ as *const libc::c_void,
145 mem::size_of_val(&optval) as libc::socklen_t,
146 );
147 if ret != 0 {
148 bail!("Set sock option failed: {:?}", io::Error::last_os_error());
149 }
150 }
151 Ok(())
152}
153
154#[cfg(target_os = "linux")]
155pub fn set_low_latency(s: &dyn std::os::fd::AsRawFd) -> Result<()> {
156 use libc;
157 use std::{io, mem};
158
159 unsafe {
160 let fd = s.as_raw_fd();
161 let nodelay: libc::c_int = 1;
163 let ret = libc::setsockopt(
164 fd,
165 libc::IPPROTO_TCP,
166 libc::TCP_NODELAY,
167 &nodelay as *const _ as *const libc::c_void,
168 mem::size_of_val(&nodelay) as libc::socklen_t,
169 );
170 if ret != 0 {
171 bail!(
172 "Failed to set TCP_NODELAY: {:?}",
173 io::Error::last_os_error()
174 );
175 }
176
177 let quickack: libc::c_int = 1;
179 let ret = libc::setsockopt(
180 fd,
181 libc::IPPROTO_TCP,
182 libc::TCP_QUICKACK,
183 &quickack as *const _ as *const libc::c_void,
184 mem::size_of_val(&quickack) as libc::socklen_t,
185 );
186 if ret != 0 {
187 bail!(
188 "Failed to set TCP_QUICKACK: {:?}",
189 io::Error::last_os_error()
190 );
191 }
192 }
193 Ok(())
194}
195
196#[cfg(target_os = "linux")]
198pub fn set_priority(s: &dyn std::os::fd::AsRawFd, priority: libc::c_int) -> Result<()> {
199 use libc;
200 use std::{io, mem};
201
202 unsafe {
203 let fd = s.as_raw_fd();
204
205 let ret = libc::setsockopt(
207 fd,
208 libc::SOL_SOCKET,
209 libc::SO_PRIORITY,
210 &priority as *const _ as *const libc::c_void,
211 mem::size_of_val(&priority) as libc::socklen_t,
212 );
213 if ret != 0 {
214 bail!(
215 "Failed to set SO_PRIORITY: {:?}",
216 io::Error::last_os_error()
217 );
218 }
219 }
220
221 Ok(())
222}
223
224impl SocketOpts {
225 pub fn apply(&self, conn: &Stream) {
226 if let Some(v) = self.keepalive {
227 let keepalive_duration = Duration::from_secs(v.keepalive_secs);
228 let keepalive_interval = Duration::from_secs(v.keepalive_interval);
229 if let Err(e) = tcp::try_set_tcp_keepalive(conn, keepalive_duration, keepalive_interval)
230 .with_context(|| "Failed to set keepalive")
231 {
232 error!("{:#}", e);
233 }
234 }
235
236 match conn {
237 Stream::Tcp(conn) => {
238 #[cfg(unix)]
239 if let Err(e) = set_reuse(conn) {
240 error!("{:#}", e);
241 }
242 if let Some(nodelay) = self.nodelay {
243 #[cfg(not(target_os = "linux"))]
244 if let Err(e) = conn
245 .set_nodelay(nodelay)
246 .with_context(|| "Failed to set nodelay")
247 {
248 error!("{:#}", e);
249 }
250 #[cfg(target_os = "linux")]
251 if nodelay {
252 if let Err(e) = set_low_latency(conn) {
253 error!("Failed to set low latency options: {:#}", e);
254 }
255 }
256 }
257 #[cfg(target_os = "linux")]
258 if let Some(priority) = self.priority {
259 if let Err(e) = set_priority(conn, priority as libc::c_int) {
260 error!("Failed to set socket priority: {:#}", e);
261 }
262 }
263 }
264 #[cfg(unix)]
265 Stream::Unix(_conn) =>
266 {
267 #[cfg(target_os = "linux")]
268 if let Some(priority) = self.priority {
269 if let Err(e) = set_priority(_conn, priority as libc::c_int) {
270 error!("Failed to set socket priority: {:#}", e);
271 }
272 }
273 }
274 }
275 }
276}