irontide_session/transport.rs
1//! Network transport abstraction layer.
2//!
3//! Provides [`NetworkFactory`] — a factory for creating TCP listeners and
4//! connections using either real tokio sockets (production) or pluggable
5//! in-memory channels (testing/simulation).
6//!
7//! The key abstraction is [`TransportListener`], an object-safe trait for
8//! accepting inbound connections, and [`BoxedStream`], a type-erased
9//! async read/write stream.
10
11use std::future::Future;
12use std::io;
13use std::net::SocketAddr;
14use std::pin::Pin;
15use std::task::{Context, Poll};
16
17use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
18use tokio::net::TcpListener;
19use tokio::net::TcpStream;
20
21use irontide_utp::UdpTransport;
22
23// ---------------------------------------------------------------------------
24// Type aliases — tame clippy::type_complexity
25// ---------------------------------------------------------------------------
26
27/// Boxed future returned by [`TransportListener::accept`].
28type AcceptFuture<'a> =
29 Pin<Box<dyn Future<Output = io::Result<(BoxedStream, SocketAddr)>> + Send + 'a>>;
30
31/// Closure type for [`NetworkFactory`]'s bind operation.
32type BindFn = Box<
33 dyn Fn(
34 SocketAddr,
35 ) -> Pin<Box<dyn Future<Output = io::Result<Box<dyn TransportListener>>> + Send>>
36 + Send
37 + Sync,
38>;
39
40/// Closure type for [`NetworkFactory`]'s connect operation.
41type ConnectFn = Box<
42 dyn Fn(SocketAddr) -> Pin<Box<dyn Future<Output = io::Result<BoxedStream>> + Send>>
43 + Send
44 + Sync,
45>;
46
47/// Closure type for [`NetworkFactory`]'s UDP bind operation.
48///
49/// Stage U entry point: lets `irontide-sim` plug a [`SimNetwork`]-backed
50/// [`UdpTransport`] into the production `irontide-utp` socket actor. The
51/// production tokio factory leaves this `None` — the session falls back
52/// to `UtpSocket::bind` directly so the FD-level DSCP / TCLASS setsockopt
53/// path remains in `irontide-utp::socket::bind`.
54type BindUdpFn = Box<
55 dyn Fn(SocketAddr) -> Pin<Box<dyn Future<Output = io::Result<Box<dyn UdpTransport>>> + Send>>
56 + Send
57 + Sync,
58>;
59
60// ---------------------------------------------------------------------------
61// BoxedStream
62// ---------------------------------------------------------------------------
63
64/// A type-erased bidirectional async stream.
65///
66/// Wraps any `AsyncRead + AsyncWrite + Unpin + Send` type behind a single
67/// trait object. This avoids the Rust limitation that `dyn` can only name
68/// one non-auto trait.
69pub struct BoxedStream {
70 inner: Pin<Box<dyn StreamRw + Send>>,
71}
72
73impl std::fmt::Debug for BoxedStream {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct("BoxedStream").finish_non_exhaustive()
76 }
77}
78
79/// Combined read/write supertrait for dyn compatibility.
80trait StreamRw: AsyncRead + AsyncWrite + Unpin {}
81impl<T: AsyncRead + AsyncWrite + Unpin> StreamRw for T {}
82
83impl BoxedStream {
84 /// Wrap any async read/write stream into a [`BoxedStream`].
85 pub fn new<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(stream: S) -> Self {
86 Self {
87 inner: Box::pin(stream),
88 }
89 }
90}
91
92impl AsyncRead for BoxedStream {
93 fn poll_read(
94 mut self: Pin<&mut Self>,
95 cx: &mut Context<'_>,
96 buf: &mut ReadBuf<'_>,
97 ) -> Poll<io::Result<()>> {
98 self.inner.as_mut().poll_read(cx, buf)
99 }
100}
101
102impl AsyncWrite for BoxedStream {
103 fn poll_write(
104 mut self: Pin<&mut Self>,
105 cx: &mut Context<'_>,
106 buf: &[u8],
107 ) -> Poll<io::Result<usize>> {
108 self.inner.as_mut().poll_write(cx, buf)
109 }
110
111 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
112 self.inner.as_mut().poll_flush(cx)
113 }
114
115 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
116 self.inner.as_mut().poll_shutdown(cx)
117 }
118}
119
120impl Unpin for BoxedStream {}
121
122// ---------------------------------------------------------------------------
123// TransportListener
124// ---------------------------------------------------------------------------
125
126/// An object-safe listener that accepts inbound connections.
127///
128/// Implemented by [`TokioListener`] for real TCP sockets; simulation backends
129/// provide their own implementation backed by in-memory channels.
130///
131/// The `accept` method returns a boxed future for dyn compatibility.
132pub trait TransportListener: Send + Sync {
133 /// Accept the next inbound connection.
134 fn accept(&mut self) -> AcceptFuture<'_>;
135
136 /// Return the local address this listener is bound to.
137 ///
138 /// # Errors
139 ///
140 /// Returns an error if the session is shut down.
141 fn local_addr(&self) -> io::Result<SocketAddr>;
142}
143
144// ---------------------------------------------------------------------------
145// TokioListener
146// ---------------------------------------------------------------------------
147
148/// A [`TransportListener`] backed by a real [`tokio::net::TcpListener`].
149pub struct TokioListener(pub TcpListener);
150
151impl TransportListener for TokioListener {
152 fn accept(&mut self) -> AcceptFuture<'_> {
153 Box::pin(async move {
154 let (stream, addr) = self.0.accept().await?;
155 // RST on close instead of FIN → skip TIME_WAIT (see connect_tcp).
156 #[allow(deprecated)]
157 let _ = stream.set_linger(Some(std::time::Duration::ZERO));
158 Ok((BoxedStream::new(stream), addr))
159 })
160 }
161
162 fn local_addr(&self) -> io::Result<SocketAddr> {
163 self.0.local_addr()
164 }
165}
166
167// ---------------------------------------------------------------------------
168// NetworkFactory
169// ---------------------------------------------------------------------------
170
171/// Factory for creating TCP listeners and outbound connections.
172///
173/// In production, use [`NetworkFactory::tokio()`] to get a factory that
174/// delegates to real tokio networking. For simulation/testing, construct
175/// via [`NetworkFactory::new()`] with custom closures that route through
176/// in-memory channels.
177pub struct NetworkFactory {
178 bind_tcp: BindFn,
179 connect_tcp: ConnectFn,
180 bind_udp: Option<BindUdpFn>,
181 is_simulated: bool,
182}
183
184impl NetworkFactory {
185 /// Create a factory with custom bind/connect closures.
186 ///
187 /// This is the primary constructor for simulation backends. The
188 /// resulting factory has no UDP bind installed; chain
189 /// [`Self::with_bind_udp`] to plumb a sim UDP transport into the
190 /// `irontide-utp` socket actor.
191 #[must_use]
192 pub fn new(bind_tcp: BindFn, connect_tcp: ConnectFn, is_simulated: bool) -> Self {
193 Self {
194 bind_tcp,
195 connect_tcp,
196 bind_udp: None,
197 is_simulated,
198 }
199 }
200
201 /// Install a UDP bind closure on this factory.
202 ///
203 /// Used by simulation backends to route uTP datagrams through the
204 /// in-memory packet bus. Production callers should leave this unset
205 /// — `irontide-session::session` falls back to `UtpSocket::bind` so
206 /// FD-level DSCP / TCLASS configuration stays in `irontide-utp`.
207 #[must_use]
208 pub fn with_bind_udp(mut self, bind_udp: BindUdpFn) -> Self {
209 self.bind_udp = Some(bind_udp);
210 self
211 }
212
213 /// Create a factory that uses real tokio TCP networking.
214 #[must_use]
215 pub fn tokio() -> Self {
216 Self {
217 bind_tcp: Box::new(|addr| {
218 Box::pin(async move {
219 let listener = TcpListener::bind(addr).await?;
220 Ok(Box::new(TokioListener(listener)) as Box<dyn TransportListener>)
221 })
222 }),
223 connect_tcp: Box::new(|addr| {
224 Box::pin(async move {
225 let stream = TcpStream::connect(addr).await?;
226 // RST on close instead of FIN → skip TIME_WAIT.
227 // Peer connections are ephemeral; TIME_WAIT accumulation
228 // degrades performance across rapid reconnection cycles.
229 // Safe: linger(0) sends RST immediately, never blocks.
230 #[allow(deprecated)]
231 let _ = stream.set_linger(Some(std::time::Duration::ZERO));
232 Ok(BoxedStream::new(stream))
233 })
234 }),
235 bind_udp: None,
236 is_simulated: false,
237 }
238 }
239
240 /// Bind a TCP listener on the given address.
241 ///
242 /// # Errors
243 ///
244 /// Returns an error if the connection or binding fails.
245 pub async fn bind_tcp(&self, addr: SocketAddr) -> io::Result<Box<dyn TransportListener>> {
246 (self.bind_tcp)(addr).await
247 }
248
249 /// Open an outbound TCP connection to the given address.
250 ///
251 /// # Errors
252 ///
253 /// Returns an error if the connection or binding fails.
254 pub async fn connect_tcp(&self, addr: SocketAddr) -> io::Result<BoxedStream> {
255 (self.connect_tcp)(addr).await
256 }
257
258 /// Returns `true` if this factory uses simulated networking.
259 #[must_use]
260 pub fn is_simulated(&self) -> bool {
261 self.is_simulated
262 }
263
264 /// Returns `true` if this factory has a UDP bind closure installed.
265 ///
266 /// Stage U: callers branch on this to choose between the production
267 /// `UtpSocket::bind` path (DSCP / TCLASS aware) and the sim
268 /// `UtpSocket::bind_with_transport` path (in-memory packet bus).
269 #[must_use]
270 pub fn has_bind_udp(&self) -> bool {
271 self.bind_udp.is_some()
272 }
273
274 /// Bind a UDP transport at the given address.
275 ///
276 /// # Errors
277 ///
278 /// Returns `Unsupported` if no UDP bind closure is installed
279 /// (the tokio factory's default), otherwise the underlying
280 /// transport's I/O error.
281 pub async fn bind_udp(&self, addr: SocketAddr) -> io::Result<Box<dyn UdpTransport>> {
282 match self.bind_udp.as_ref() {
283 Some(f) => f(addr).await,
284 None => Err(io::Error::new(
285 io::ErrorKind::Unsupported,
286 "factory has no UDP bind installed",
287 )),
288 }
289 }
290}
291
292// ---------------------------------------------------------------------------
293// Tests
294// ---------------------------------------------------------------------------
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299 use tokio::io::{AsyncReadExt, AsyncWriteExt};
300
301 #[test]
302 fn tokio_factory_creation() {
303 let _factory = NetworkFactory::tokio();
304 }
305
306 #[test]
307 fn tokio_factory_is_not_simulated() {
308 let factory = NetworkFactory::tokio();
309 assert!(!factory.is_simulated());
310 }
311
312 #[tokio::test]
313 async fn tokio_bind_and_accept() {
314 let factory = NetworkFactory::tokio();
315 let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
316 let listener = factory.bind_tcp(addr).await.unwrap();
317 let local = listener.local_addr().unwrap();
318 assert_ne!(local.port(), 0);
319 }
320
321 #[tokio::test]
322 async fn tokio_connect_to_listener() {
323 let factory = NetworkFactory::tokio();
324 let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
325 let mut listener = factory.bind_tcp(addr).await.unwrap();
326 let local = listener.local_addr().unwrap();
327
328 let accept_handle = tokio::spawn(async move { listener.accept().await.unwrap() });
329
330 let mut client = factory.connect_tcp(local).await.unwrap();
331 client.write_all(b"hello").await.unwrap();
332
333 let (mut server_stream, peer_addr) = accept_handle.await.unwrap();
334 assert_eq!(
335 peer_addr.ip(),
336 "127.0.0.1".parse::<std::net::IpAddr>().unwrap()
337 );
338
339 let mut buf = [0u8; 5];
340 server_stream.read_exact(&mut buf).await.unwrap();
341 assert_eq!(&buf, b"hello");
342 }
343
344 #[test]
345 fn custom_factory_is_simulated() {
346 let factory = NetworkFactory::new(
347 Box::new(|_addr| {
348 Box::pin(async move { Err(io::Error::new(io::ErrorKind::Unsupported, "stub")) })
349 }),
350 Box::new(|_addr| {
351 Box::pin(async move { Err(io::Error::new(io::ErrorKind::Unsupported, "stub")) })
352 }),
353 true,
354 );
355 assert!(factory.is_simulated());
356 }
357}