Skip to main content

irontide_session/
transport.rs

1//! Network transport abstraction layer.
2//!
3//! Provides [`NetworkFactory`] — a factory for creating TCP listeners and
4//! connections using either real tokio sockets (production) or pluggable
5//! in-memory channels (testing/simulation).
6//!
7//! The key abstraction is [`TransportListener`], an object-safe trait for
8//! accepting inbound connections, and [`BoxedStream`], a type-erased
9//! async read/write stream.
10
11use std::future::Future;
12use std::io;
13use std::net::SocketAddr;
14use std::pin::Pin;
15use std::task::{Context, Poll};
16
17use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
18use tokio::net::TcpListener;
19use tokio::net::TcpStream;
20
21use irontide_utp::UdpTransport;
22
23// ---------------------------------------------------------------------------
24// Type aliases — tame clippy::type_complexity
25// ---------------------------------------------------------------------------
26
27/// Boxed future returned by [`TransportListener::accept`].
28type AcceptFuture<'a> =
29    Pin<Box<dyn Future<Output = io::Result<(BoxedStream, SocketAddr)>> + Send + 'a>>;
30
31/// Closure type for [`NetworkFactory`]'s bind operation.
32type BindFn = Box<
33    dyn Fn(
34            SocketAddr,
35        ) -> Pin<Box<dyn Future<Output = io::Result<Box<dyn TransportListener>>> + Send>>
36        + Send
37        + Sync,
38>;
39
40/// Closure type for [`NetworkFactory`]'s connect operation.
41type ConnectFn = Box<
42    dyn Fn(SocketAddr) -> Pin<Box<dyn Future<Output = io::Result<BoxedStream>> + Send>>
43        + Send
44        + Sync,
45>;
46
47/// Closure type for [`NetworkFactory`]'s UDP bind operation.
48///
49/// Stage U entry point: lets `irontide-sim` plug a [`SimNetwork`]-backed
50/// [`UdpTransport`] into the production `irontide-utp` socket actor. The
51/// production tokio factory leaves this `None` — the session falls back
52/// to `UtpSocket::bind` directly so the FD-level DSCP / TCLASS setsockopt
53/// path remains in `irontide-utp::socket::bind`.
54type BindUdpFn = Box<
55    dyn Fn(SocketAddr) -> Pin<Box<dyn Future<Output = io::Result<Box<dyn UdpTransport>>> + Send>>
56        + Send
57        + Sync,
58>;
59
60// ---------------------------------------------------------------------------
61// BoxedStream
62// ---------------------------------------------------------------------------
63
64/// A type-erased bidirectional async stream.
65///
66/// Wraps any `AsyncRead + AsyncWrite + Unpin + Send` type behind a single
67/// trait object. This avoids the Rust limitation that `dyn` can only name
68/// one non-auto trait.
69pub struct BoxedStream {
70    inner: Pin<Box<dyn StreamRw + Send>>,
71}
72
73impl std::fmt::Debug for BoxedStream {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("BoxedStream").finish_non_exhaustive()
76    }
77}
78
79/// Combined read/write supertrait for dyn compatibility.
80trait StreamRw: AsyncRead + AsyncWrite + Unpin {}
81impl<T: AsyncRead + AsyncWrite + Unpin> StreamRw for T {}
82
83impl BoxedStream {
84    /// Wrap any async read/write stream into a [`BoxedStream`].
85    pub fn new<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(stream: S) -> Self {
86        Self {
87            inner: Box::pin(stream),
88        }
89    }
90}
91
92impl AsyncRead for BoxedStream {
93    fn poll_read(
94        mut self: Pin<&mut Self>,
95        cx: &mut Context<'_>,
96        buf: &mut ReadBuf<'_>,
97    ) -> Poll<io::Result<()>> {
98        self.inner.as_mut().poll_read(cx, buf)
99    }
100}
101
102impl AsyncWrite for BoxedStream {
103    fn poll_write(
104        mut self: Pin<&mut Self>,
105        cx: &mut Context<'_>,
106        buf: &[u8],
107    ) -> Poll<io::Result<usize>> {
108        self.inner.as_mut().poll_write(cx, buf)
109    }
110
111    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
112        self.inner.as_mut().poll_flush(cx)
113    }
114
115    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
116        self.inner.as_mut().poll_shutdown(cx)
117    }
118}
119
120impl Unpin for BoxedStream {}
121
122// ---------------------------------------------------------------------------
123// TransportListener
124// ---------------------------------------------------------------------------
125
126/// An object-safe listener that accepts inbound connections.
127///
128/// Implemented by [`TokioListener`] for real TCP sockets; simulation backends
129/// provide their own implementation backed by in-memory channels.
130///
131/// The `accept` method returns a boxed future for dyn compatibility.
132pub trait TransportListener: Send + Sync {
133    /// Accept the next inbound connection.
134    fn accept(&mut self) -> AcceptFuture<'_>;
135
136    /// Return the local address this listener is bound to.
137    ///
138    /// # Errors
139    ///
140    /// Returns an error if the session is shut down.
141    fn local_addr(&self) -> io::Result<SocketAddr>;
142}
143
144// ---------------------------------------------------------------------------
145// TokioListener
146// ---------------------------------------------------------------------------
147
148/// A [`TransportListener`] backed by a real [`tokio::net::TcpListener`].
149pub struct TokioListener(pub TcpListener);
150
151impl TransportListener for TokioListener {
152    fn accept(&mut self) -> AcceptFuture<'_> {
153        Box::pin(async move {
154            let (stream, addr) = self.0.accept().await?;
155            // RST on close instead of FIN → skip TIME_WAIT (see connect_tcp).
156            #[allow(deprecated)]
157            let _ = stream.set_linger(Some(std::time::Duration::ZERO));
158            Ok((BoxedStream::new(stream), addr))
159        })
160    }
161
162    fn local_addr(&self) -> io::Result<SocketAddr> {
163        self.0.local_addr()
164    }
165}
166
167// ---------------------------------------------------------------------------
168// NetworkFactory
169// ---------------------------------------------------------------------------
170
171/// Factory for creating TCP listeners and outbound connections.
172///
173/// In production, use [`NetworkFactory::tokio()`] to get a factory that
174/// delegates to real tokio networking. For simulation/testing, construct
175/// via [`NetworkFactory::new()`] with custom closures that route through
176/// in-memory channels.
177pub struct NetworkFactory {
178    bind_tcp: BindFn,
179    connect_tcp: ConnectFn,
180    bind_udp: Option<BindUdpFn>,
181    is_simulated: bool,
182}
183
184impl NetworkFactory {
185    /// Create a factory with custom bind/connect closures.
186    ///
187    /// This is the primary constructor for simulation backends. The
188    /// resulting factory has no UDP bind installed; chain
189    /// [`Self::with_bind_udp`] to plumb a sim UDP transport into the
190    /// `irontide-utp` socket actor.
191    #[must_use]
192    pub fn new(bind_tcp: BindFn, connect_tcp: ConnectFn, is_simulated: bool) -> Self {
193        Self {
194            bind_tcp,
195            connect_tcp,
196            bind_udp: None,
197            is_simulated,
198        }
199    }
200
201    /// Install a UDP bind closure on this factory.
202    ///
203    /// Used by simulation backends to route uTP datagrams through the
204    /// in-memory packet bus. Production callers should leave this unset
205    /// — `irontide-session::session` falls back to `UtpSocket::bind` so
206    /// FD-level DSCP / TCLASS configuration stays in `irontide-utp`.
207    #[must_use]
208    pub fn with_bind_udp(mut self, bind_udp: BindUdpFn) -> Self {
209        self.bind_udp = Some(bind_udp);
210        self
211    }
212
213    /// Create a factory that uses real tokio TCP networking.
214    #[must_use]
215    pub fn tokio() -> Self {
216        Self {
217            bind_tcp: Box::new(|addr| {
218                Box::pin(async move {
219                    let listener = TcpListener::bind(addr).await?;
220                    Ok(Box::new(TokioListener(listener)) as Box<dyn TransportListener>)
221                })
222            }),
223            connect_tcp: Box::new(|addr| {
224                Box::pin(async move {
225                    let stream = TcpStream::connect(addr).await?;
226                    // RST on close instead of FIN → skip TIME_WAIT.
227                    // Peer connections are ephemeral; TIME_WAIT accumulation
228                    // degrades performance across rapid reconnection cycles.
229                    // Safe: linger(0) sends RST immediately, never blocks.
230                    #[allow(deprecated)]
231                    let _ = stream.set_linger(Some(std::time::Duration::ZERO));
232                    Ok(BoxedStream::new(stream))
233                })
234            }),
235            bind_udp: None,
236            is_simulated: false,
237        }
238    }
239
240    /// Bind a TCP listener on the given address.
241    ///
242    /// # Errors
243    ///
244    /// Returns an error if the connection or binding fails.
245    pub async fn bind_tcp(&self, addr: SocketAddr) -> io::Result<Box<dyn TransportListener>> {
246        (self.bind_tcp)(addr).await
247    }
248
249    /// Open an outbound TCP connection to the given address.
250    ///
251    /// # Errors
252    ///
253    /// Returns an error if the connection or binding fails.
254    pub async fn connect_tcp(&self, addr: SocketAddr) -> io::Result<BoxedStream> {
255        (self.connect_tcp)(addr).await
256    }
257
258    /// Returns `true` if this factory uses simulated networking.
259    #[must_use]
260    pub fn is_simulated(&self) -> bool {
261        self.is_simulated
262    }
263
264    /// Returns `true` if this factory has a UDP bind closure installed.
265    ///
266    /// Stage U: callers branch on this to choose between the production
267    /// `UtpSocket::bind` path (DSCP / TCLASS aware) and the sim
268    /// `UtpSocket::bind_with_transport` path (in-memory packet bus).
269    #[must_use]
270    pub fn has_bind_udp(&self) -> bool {
271        self.bind_udp.is_some()
272    }
273
274    /// Bind a UDP transport at the given address.
275    ///
276    /// # Errors
277    ///
278    /// Returns `Unsupported` if no UDP bind closure is installed
279    /// (the tokio factory's default), otherwise the underlying
280    /// transport's I/O error.
281    pub async fn bind_udp(&self, addr: SocketAddr) -> io::Result<Box<dyn UdpTransport>> {
282        match self.bind_udp.as_ref() {
283            Some(f) => f(addr).await,
284            None => Err(io::Error::new(
285                io::ErrorKind::Unsupported,
286                "factory has no UDP bind installed",
287            )),
288        }
289    }
290}
291
292// ---------------------------------------------------------------------------
293// Tests
294// ---------------------------------------------------------------------------
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use tokio::io::{AsyncReadExt, AsyncWriteExt};
300
301    #[test]
302    fn tokio_factory_creation() {
303        let _factory = NetworkFactory::tokio();
304    }
305
306    #[test]
307    fn tokio_factory_is_not_simulated() {
308        let factory = NetworkFactory::tokio();
309        assert!(!factory.is_simulated());
310    }
311
312    #[tokio::test]
313    async fn tokio_bind_and_accept() {
314        let factory = NetworkFactory::tokio();
315        let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
316        let listener = factory.bind_tcp(addr).await.unwrap();
317        let local = listener.local_addr().unwrap();
318        assert_ne!(local.port(), 0);
319    }
320
321    #[tokio::test]
322    async fn tokio_connect_to_listener() {
323        let factory = NetworkFactory::tokio();
324        let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
325        let mut listener = factory.bind_tcp(addr).await.unwrap();
326        let local = listener.local_addr().unwrap();
327
328        let accept_handle = tokio::spawn(async move { listener.accept().await.unwrap() });
329
330        let mut client = factory.connect_tcp(local).await.unwrap();
331        client.write_all(b"hello").await.unwrap();
332
333        let (mut server_stream, peer_addr) = accept_handle.await.unwrap();
334        assert_eq!(
335            peer_addr.ip(),
336            "127.0.0.1".parse::<std::net::IpAddr>().unwrap()
337        );
338
339        let mut buf = [0u8; 5];
340        server_stream.read_exact(&mut buf).await.unwrap();
341        assert_eq!(&buf, b"hello");
342    }
343
344    #[test]
345    fn custom_factory_is_simulated() {
346        let factory = NetworkFactory::new(
347            Box::new(|_addr| {
348                Box::pin(async move { Err(io::Error::new(io::ErrorKind::Unsupported, "stub")) })
349            }),
350            Box::new(|_addr| {
351                Box::pin(async move { Err(io::Error::new(io::ErrorKind::Unsupported, "stub")) })
352            }),
353            true,
354        );
355        assert!(factory.is_simulated());
356    }
357}