Skip to main content

dynomite/io/
reactor.rs

1//! Async transport abstraction over the connection state machine.
2//!
3//! The C engine's per-platform `src/event/dyn_{epoll,kqueue,evport}.c`
4//! reactor is replaced wholesale by tokio. This module defines a
5//! [`Transport`] trait that downstream stages drive through
6//! `tokio::io::{AsyncRead, AsyncWrite}` and a [`ConnRole`] enum that
7//! mirrors the connection-role enumerations carried on the C `struct
8//! conn` (`client`, `server`, `proxy`, plus their dnode peer
9//! variants).
10//!
11//! The trait is intentionally narrow:
12//!
13//! * [`Transport::role`] returns the connection role for routing /
14//!   metric tagging.
15//! * [`Transport::peer_addr`] returns the remote address when the
16//!   transport is connected to a known endpoint. QUIC, for example,
17//!   may not always have one.
18//!
19//! `Transport` does not expose any TCP-specific operations; this is
20//! deliberate so the Stage 9 QUIC implementation can wrap a
21//! `quiche::Connection` in a `QuicTransport` newtype without changing
22//! callers.
23//!
24//! [`TcpTransport`] is the TCP implementation, a newtype wrapper
25//! around [`tokio::net::TcpStream`].
26//!
27//! # Examples
28//!
29//! ```
30//! use dynomite::io::reactor::{ConnRole, TcpTransport, Transport};
31//! use tokio::io::{AsyncReadExt, AsyncWriteExt};
32//! use tokio::net::{TcpListener, TcpStream};
33//!
34//! # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
35//! let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
36//! let addr = listener.local_addr().unwrap();
37//! let server = tokio::spawn(async move {
38//!     let (sock, _) = listener.accept().await.unwrap();
39//!     let mut t = TcpTransport::new(sock, ConnRole::Server);
40//!     let mut buf = [0u8; 5];
41//!     t.read_exact(&mut buf).await.unwrap();
42//!     t.write_all(&buf).await.unwrap();
43//! });
44//! let sock = TcpStream::connect(addr).await.unwrap();
45//! let mut client = TcpTransport::new(sock, ConnRole::Client);
46//! client.write_all(b"hello").await.unwrap();
47//! let mut out = [0u8; 5];
48//! client.read_exact(&mut out).await.unwrap();
49//! assert_eq!(&out, b"hello");
50//! server.await.unwrap();
51//! # });
52//! ```
53
54use std::io;
55use std::net::SocketAddr;
56use std::pin::Pin;
57use std::task::{Context, Poll};
58
59use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
60use tokio::net::TcpStream;
61
62/// Role tag for a connection. Mirrors the role enumeration on
63/// `struct conn` in the C engine.
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
65pub enum ConnRole {
66    /// Client-facing listener that accepted the connection.
67    Proxy,
68    /// Connection from a client driver (Redis or Memcached).
69    Client,
70    /// Connection to a backend datastore (Redis or Memcached).
71    Server,
72    /// Listener that accepts dnode peer connections from other nodes.
73    DnodePeerProxy,
74    /// Inbound dnode peer connection (a remote node connected to us).
75    DnodePeerClient,
76    /// Outbound dnode peer connection (we connected to a remote node).
77    DnodePeerServer,
78}
79
80impl ConnRole {
81    /// True when the role represents a listening socket. Mirrors the
82    /// `is_listener` predicate used by the C reactor's dispatch
83    /// branches.
84    ///
85    /// # Examples
86    ///
87    /// ```
88    /// use dynomite::io::reactor::ConnRole;
89    /// assert!(ConnRole::Proxy.is_listener());
90    /// assert!(!ConnRole::Client.is_listener());
91    /// ```
92    pub fn is_listener(self) -> bool {
93        matches!(self, Self::Proxy | Self::DnodePeerProxy)
94    }
95
96    /// True when the role represents a peer-to-peer dnode link.
97    ///
98    /// # Examples
99    ///
100    /// ```
101    /// use dynomite::io::reactor::ConnRole;
102    /// assert!(ConnRole::DnodePeerClient.is_dnode_peer());
103    /// assert!(!ConnRole::Client.is_dnode_peer());
104    /// ```
105    pub fn is_dnode_peer(self) -> bool {
106        matches!(
107            self,
108            Self::DnodePeerProxy | Self::DnodePeerClient | Self::DnodePeerServer
109        )
110    }
111}
112
113/// Generic async byte-stream the engine reads and writes through.
114///
115/// Implementors must be [`Send`] and [`Unpin`] so they fit the tokio
116/// task model. Adding a new transport (Stage 9 QUIC, for example)
117/// means newtyping its connection handle and implementing
118/// [`AsyncRead`], [`AsyncWrite`], and `Transport`.
119pub trait Transport: AsyncRead + AsyncWrite + Send + Unpin {
120    /// Connection role.
121    fn role(&self) -> ConnRole;
122
123    /// Remote socket address when the transport is connected to one.
124    /// Returns `None` for transports that do not surface a
125    /// `SocketAddr` (for example, QUIC over a virtual interface).
126    fn peer_addr(&self) -> Option<SocketAddr>;
127}
128
129/// TCP-backed [`Transport`].
130///
131/// Newtype around [`tokio::net::TcpStream`]. The role is supplied at
132/// construction time because the same TCP socket type is used for
133/// every C role variant; the discriminator lives one level up in the
134/// listener registration.
135#[derive(Debug)]
136pub struct TcpTransport {
137    inner: TcpStream,
138    role: ConnRole,
139}
140
141impl TcpTransport {
142    /// Wrap a TCP stream with the given role tag.
143    ///
144    /// # Examples
145    ///
146    /// ```
147    /// use dynomite::io::reactor::{ConnRole, TcpTransport, Transport};
148    /// use tokio::net::{TcpListener, TcpStream};
149    ///
150    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
151    /// let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
152    /// let addr = listener.local_addr().unwrap();
153    /// let _accept = tokio::spawn(async move {
154    ///     let (s, _) = listener.accept().await.unwrap();
155    ///     drop(s);
156    /// });
157    /// let sock = TcpStream::connect(addr).await.unwrap();
158    /// let t = TcpTransport::new(sock, ConnRole::Client);
159    /// assert_eq!(t.role(), ConnRole::Client);
160    /// # });
161    /// ```
162    pub fn new(stream: TcpStream, role: ConnRole) -> Self {
163        Self {
164            inner: stream,
165            role,
166        }
167    }
168
169    /// Borrow the wrapped tokio stream.
170    pub fn get_ref(&self) -> &TcpStream {
171        &self.inner
172    }
173
174    /// Mutably borrow the wrapped tokio stream. Useful for setting
175    /// per-socket options (`set_nodelay`, `set_linger`, ...) without
176    /// re-implementing them on the wrapper.
177    pub fn get_mut(&mut self) -> &mut TcpStream {
178        &mut self.inner
179    }
180
181    /// Consume the wrapper and return the inner stream.
182    pub fn into_inner(self) -> TcpStream {
183        self.inner
184    }
185}
186
187impl Transport for TcpTransport {
188    fn role(&self) -> ConnRole {
189        self.role
190    }
191
192    fn peer_addr(&self) -> Option<SocketAddr> {
193        self.inner.peer_addr().ok()
194    }
195}
196
197impl AsyncRead for TcpTransport {
198    fn poll_read(
199        mut self: Pin<&mut Self>,
200        cx: &mut Context<'_>,
201        buf: &mut ReadBuf<'_>,
202    ) -> Poll<io::Result<()>> {
203        Pin::new(&mut self.inner).poll_read(cx, buf)
204    }
205}
206
207impl AsyncWrite for TcpTransport {
208    fn poll_write(
209        mut self: Pin<&mut Self>,
210        cx: &mut Context<'_>,
211        buf: &[u8],
212    ) -> Poll<io::Result<usize>> {
213        Pin::new(&mut self.inner).poll_write(cx, buf)
214    }
215
216    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
217        Pin::new(&mut self.inner).poll_flush(cx)
218    }
219
220    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
221        Pin::new(&mut self.inner).poll_shutdown(cx)
222    }
223
224    fn poll_write_vectored(
225        mut self: Pin<&mut Self>,
226        cx: &mut Context<'_>,
227        bufs: &[io::IoSlice<'_>],
228    ) -> Poll<io::Result<usize>> {
229        Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
230    }
231
232    fn is_write_vectored(&self) -> bool {
233        self.inner.is_write_vectored()
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use tokio::io::{AsyncReadExt, AsyncWriteExt};
241    use tokio::net::{TcpListener, TcpStream};
242
243    #[test]
244    fn role_predicates() {
245        assert!(ConnRole::Proxy.is_listener());
246        assert!(ConnRole::DnodePeerProxy.is_listener());
247        assert!(!ConnRole::Server.is_listener());
248        assert!(ConnRole::DnodePeerServer.is_dnode_peer());
249        assert!(!ConnRole::Server.is_dnode_peer());
250    }
251
252    #[tokio::test]
253    async fn tcp_transport_round_trip() {
254        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
255        let addr = listener.local_addr().unwrap();
256        let server = tokio::spawn(async move {
257            let (s, _) = listener.accept().await.unwrap();
258            let mut t = TcpTransport::new(s, ConnRole::Server);
259            assert_eq!(t.role(), ConnRole::Server);
260            let mut buf = [0u8; 4];
261            t.read_exact(&mut buf).await.unwrap();
262            t.write_all(&buf).await.unwrap();
263        });
264        let sock = TcpStream::connect(addr).await.unwrap();
265        let mut client = TcpTransport::new(sock, ConnRole::Client);
266        assert_eq!(client.role(), ConnRole::Client);
267        assert!(client.peer_addr().is_some());
268        client.write_all(b"ping").await.unwrap();
269        let mut out = [0u8; 4];
270        client.read_exact(&mut out).await.unwrap();
271        assert_eq!(&out, b"ping");
272        server.await.unwrap();
273    }
274}