aerosocket_core/
transport.rs1use crate::error::Result;
7
8#[async_trait::async_trait]
10pub trait Transport: Send + Sync + 'static {
11 type Stream: TransportStream;
13
14 async fn accept(&self) -> Result<Self::Stream>;
16
17 fn local_addr(&self) -> Result<std::net::SocketAddr>;
19
20 async fn close(self) -> Result<()>;
22}
23
24#[async_trait::async_trait]
26pub trait TransportStream: Send + Sync {
27 async fn read(&mut self, buf: &mut [u8]) -> Result<usize>;
29
30 async fn write(&mut self, buf: &[u8]) -> Result<usize>;
32
33 async fn write_all(&mut self, buf: &[u8]) -> Result<()>;
35
36 async fn flush(&mut self) -> Result<()>;
38
39 async fn close(&mut self) -> Result<()>;
41
42 fn remote_addr(&self) -> Result<std::net::SocketAddr>;
44
45 fn local_addr(&self) -> Result<std::net::SocketAddr>;
47}
48
49#[derive(Debug, Clone)]
51pub struct TransportConfig {
52 pub max_frame_size: usize,
54 pub max_message_size: usize,
56 pub handshake_timeout: std::time::Duration,
58 pub idle_timeout: std::time::Duration,
60 pub nodelay: bool,
62 pub recv_buffer_size: Option<usize>,
64 pub send_buffer_size: Option<usize>,
66}
67
68impl Default for TransportConfig {
69 fn default() -> Self {
70 Self {
71 max_frame_size: crate::protocol::constants::DEFAULT_MAX_FRAME_SIZE,
72 max_message_size: crate::protocol::constants::DEFAULT_MAX_MESSAGE_SIZE,
73 handshake_timeout: crate::protocol::constants::DEFAULT_HANDSHAKE_TIMEOUT,
74 idle_timeout: crate::protocol::constants::DEFAULT_IDLE_TIMEOUT,
75 nodelay: true,
76 recv_buffer_size: None,
77 send_buffer_size: None,
78 }
79 }
80}
81
82#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
84pub mod tcp {
85 use super::*;
86
87 #[derive(Debug)]
89 #[allow(dead_code)]
90 pub struct TcpTransport {
91 config: TransportConfig,
92 }
93
94 impl TcpTransport {
95 pub async fn bind(_addr: std::net::SocketAddr, config: TransportConfig) -> Result<Self> {
97 Ok(Self { config })
99 }
100
101 pub async fn bind_default(addr: std::net::SocketAddr) -> Result<Self> {
103 Self::bind(addr, TransportConfig::default()).await
104 }
105 }
106
107 #[derive(Debug)]
109 #[allow(dead_code)]
110 pub struct TcpStream {
111 config: TransportConfig,
112 }
113
114 impl TcpStream {
115 pub fn new(config: TransportConfig) -> Self {
117 Self { config }
118 }
119 }
120}
121
122#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
124#[cfg(feature = "transport-tls")]
125pub mod tls {
126 use super::*;
127
128 #[derive(Debug)]
130 #[allow(dead_code)]
131 pub struct TlsTransport {
132 config: TransportConfig,
133 }
134
135 impl TlsTransport {
136 pub async fn bind(_addr: std::net::SocketAddr, config: TransportConfig) -> Result<Self> {
138 Ok(Self { config })
140 }
141 }
142
143 #[derive(Debug)]
145 #[allow(dead_code)]
146 pub struct TlsStream {
147 config: TransportConfig,
148 }
149
150 impl TlsStream {
151 pub fn new(config: TransportConfig) -> Self {
153 Self { config }
154 }
155 }
156}
157
158#[cfg(test)]
160pub mod mock {
161 use super::*;
162 use std::sync::mpsc;
163
164 #[derive(Debug)]
166 pub struct MockTransport {
167 receiver: mpsc::Receiver<MockStream>,
168 local_addr: std::net::SocketAddr,
169 }
170
171 impl MockTransport {
172 pub fn new() -> (Self, mpsc::Sender<MockStream>) {
174 let (sender, receiver) = mpsc::channel();
175 let transport = Self {
176 receiver,
177 local_addr: "127.0.0.1:0".parse().unwrap(),
178 };
179 (transport, sender)
180 }
181
182 pub fn accept(&self) -> Result<MockStream> {
184 match self.receiver.recv() {
185 Ok(stream) => Ok(stream),
186 Err(_) => Err(crate::Error::Connection(
187 "Mock transport closed".to_string(),
188 )),
189 }
190 }
191
192 pub fn local_addr(&self) -> Result<std::net::SocketAddr> {
194 Ok(self.local_addr)
195 }
196 }
197
198 #[derive(Debug, Clone)]
200 pub struct MockStream {
201 data: Vec<u8>,
202 remote_addr: std::net::SocketAddr,
203 local_addr: std::net::SocketAddr,
204 }
205
206 impl MockStream {
207 pub fn new(data: Vec<u8>) -> Self {
209 Self {
210 data,
211 remote_addr: "127.0.0.1:12345".parse().unwrap(),
212 local_addr: "127.0.0.1:8080".parse().unwrap(),
213 }
214 }
215
216 pub fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
218 let to_copy = std::cmp::min(buf.len(), self.data.len());
219 buf[..to_copy].copy_from_slice(&self.data[..to_copy]);
220 self.data.drain(0..to_copy);
221 Ok(to_copy)
222 }
223
224 pub fn write(&mut self, buf: &[u8]) -> Result<usize> {
226 self.data.extend_from_slice(buf);
227 Ok(buf.len())
228 }
229
230 pub fn remote_addr(&self) -> Result<std::net::SocketAddr> {
232 Ok(self.remote_addr)
233 }
234
235 pub fn local_addr(&self) -> Result<std::net::SocketAddr> {
237 Ok(self.local_addr)
238 }
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245
246 #[test]
247 fn test_transport_config_default() {
248 let config = TransportConfig::default();
249 assert_eq!(
250 config.max_frame_size,
251 crate::protocol::constants::DEFAULT_MAX_FRAME_SIZE
252 );
253 assert!(config.nodelay);
254 }
255
256 #[cfg(test)]
257 mod mock_tests {
258 use crate::transport::mock::*;
259
260 #[test]
261 fn test_mock_transport() {
262 let (transport, sender) = MockTransport::new();
263 let stream = MockStream::new(b"hello".to_vec());
264 sender.send(stream).unwrap();
265
266 let accepted = transport.accept().unwrap();
267 assert_eq!(
268 accepted.remote_addr().unwrap().to_string(),
269 "127.0.0.1:12345"
270 );
271 }
272
273 #[test]
274 fn test_mock_stream() {
275 let mut stream = MockStream::new(Vec::new());
276
277 let written = stream.write(b"hello").unwrap();
278 assert_eq!(written, 5);
279
280 let mut buf = [0u8; 10];
281 let read = stream.read(&mut buf).unwrap();
282 assert_eq!(read, 5);
283 assert_eq!(&buf[..5], b"hello");
284 }
285 }
286}