dynomite/io/reactor.rs
1//! Async transport abstraction over the connection state machine.
2//!
3//! The C engine's per-platform `src/event/dyn_{epoll,kqueue,evport}.c`
4//! reactor is replaced wholesale by tokio. This module defines a
5//! [`Transport`] trait that downstream stages drive through
6//! `tokio::io::{AsyncRead, AsyncWrite}` and a [`ConnRole`] enum that
7//! mirrors the connection-role enumerations carried on the C `struct
8//! conn` (`client`, `server`, `proxy`, plus their dnode peer
9//! variants).
10//!
11//! The trait is intentionally narrow:
12//!
13//! * [`Transport::role`] returns the connection role for routing /
14//! metric tagging.
15//! * [`Transport::peer_addr`] returns the remote address when the
16//! transport is connected to a known endpoint. QUIC, for example,
17//! may not always have one.
18//!
19//! `Transport` does not expose any TCP-specific operations; this is
20//! deliberate so the Stage 9 QUIC implementation can wrap a
21//! `quiche::Connection` in a `QuicTransport` newtype without changing
22//! callers.
23//!
24//! [`TcpTransport`] is the TCP implementation, a newtype wrapper
25//! around [`tokio::net::TcpStream`].
26//!
27//! # Examples
28//!
29//! ```
30//! use dynomite::io::reactor::{ConnRole, TcpTransport, Transport};
31//! use tokio::io::{AsyncReadExt, AsyncWriteExt};
32//! use tokio::net::{TcpListener, TcpStream};
33//!
34//! # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
35//! let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
36//! let addr = listener.local_addr().unwrap();
37//! let server = tokio::spawn(async move {
38//! let (sock, _) = listener.accept().await.unwrap();
39//! let mut t = TcpTransport::new(sock, ConnRole::Server);
40//! let mut buf = [0u8; 5];
41//! t.read_exact(&mut buf).await.unwrap();
42//! t.write_all(&buf).await.unwrap();
43//! });
44//! let sock = TcpStream::connect(addr).await.unwrap();
45//! let mut client = TcpTransport::new(sock, ConnRole::Client);
46//! client.write_all(b"hello").await.unwrap();
47//! let mut out = [0u8; 5];
48//! client.read_exact(&mut out).await.unwrap();
49//! assert_eq!(&out, b"hello");
50//! server.await.unwrap();
51//! # });
52//! ```
53
54use std::io;
55use std::net::SocketAddr;
56use std::pin::Pin;
57use std::task::{Context, Poll};
58
59use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
60use tokio::net::TcpStream;
61
62/// Role tag for a connection. Mirrors the role enumeration on
63/// `struct conn` in the C engine.
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
65pub enum ConnRole {
66 /// Client-facing listener that accepted the connection.
67 Proxy,
68 /// Connection from a client driver (Redis or Memcached).
69 Client,
70 /// Connection to a backend datastore (Redis or Memcached).
71 Server,
72 /// Listener that accepts dnode peer connections from other nodes.
73 DnodePeerProxy,
74 /// Inbound dnode peer connection (a remote node connected to us).
75 DnodePeerClient,
76 /// Outbound dnode peer connection (we connected to a remote node).
77 DnodePeerServer,
78}
79
80impl ConnRole {
81 /// True when the role represents a listening socket. Mirrors the
82 /// `is_listener` predicate used by the C reactor's dispatch
83 /// branches.
84 ///
85 /// # Examples
86 ///
87 /// ```
88 /// use dynomite::io::reactor::ConnRole;
89 /// assert!(ConnRole::Proxy.is_listener());
90 /// assert!(!ConnRole::Client.is_listener());
91 /// ```
92 pub fn is_listener(self) -> bool {
93 matches!(self, Self::Proxy | Self::DnodePeerProxy)
94 }
95
96 /// True when the role represents a peer-to-peer dnode link.
97 ///
98 /// # Examples
99 ///
100 /// ```
101 /// use dynomite::io::reactor::ConnRole;
102 /// assert!(ConnRole::DnodePeerClient.is_dnode_peer());
103 /// assert!(!ConnRole::Client.is_dnode_peer());
104 /// ```
105 pub fn is_dnode_peer(self) -> bool {
106 matches!(
107 self,
108 Self::DnodePeerProxy | Self::DnodePeerClient | Self::DnodePeerServer
109 )
110 }
111}
112
113/// Generic async byte-stream the engine reads and writes through.
114///
115/// Implementors must be [`Send`] and [`Unpin`] so they fit the tokio
116/// task model. Adding a new transport (Stage 9 QUIC, for example)
117/// means newtyping its connection handle and implementing
118/// [`AsyncRead`], [`AsyncWrite`], and `Transport`.
119pub trait Transport: AsyncRead + AsyncWrite + Send + Unpin {
120 /// Connection role.
121 fn role(&self) -> ConnRole;
122
123 /// Remote socket address when the transport is connected to one.
124 /// Returns `None` for transports that do not surface a
125 /// `SocketAddr` (for example, QUIC over a virtual interface).
126 fn peer_addr(&self) -> Option<SocketAddr>;
127}
128
129/// TCP-backed [`Transport`].
130///
131/// Newtype around [`tokio::net::TcpStream`]. The role is supplied at
132/// construction time because the same TCP socket type is used for
133/// every C role variant; the discriminator lives one level up in the
134/// listener registration.
135#[derive(Debug)]
136pub struct TcpTransport {
137 inner: TcpStream,
138 role: ConnRole,
139}
140
141impl TcpTransport {
142 /// Wrap a TCP stream with the given role tag.
143 ///
144 /// # Examples
145 ///
146 /// ```
147 /// use dynomite::io::reactor::{ConnRole, TcpTransport, Transport};
148 /// use tokio::net::{TcpListener, TcpStream};
149 ///
150 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
151 /// let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
152 /// let addr = listener.local_addr().unwrap();
153 /// let _accept = tokio::spawn(async move {
154 /// let (s, _) = listener.accept().await.unwrap();
155 /// drop(s);
156 /// });
157 /// let sock = TcpStream::connect(addr).await.unwrap();
158 /// let t = TcpTransport::new(sock, ConnRole::Client);
159 /// assert_eq!(t.role(), ConnRole::Client);
160 /// # });
161 /// ```
162 pub fn new(stream: TcpStream, role: ConnRole) -> Self {
163 Self {
164 inner: stream,
165 role,
166 }
167 }
168
169 /// Borrow the wrapped tokio stream.
170 pub fn get_ref(&self) -> &TcpStream {
171 &self.inner
172 }
173
174 /// Mutably borrow the wrapped tokio stream. Useful for setting
175 /// per-socket options (`set_nodelay`, `set_linger`, ...) without
176 /// re-implementing them on the wrapper.
177 pub fn get_mut(&mut self) -> &mut TcpStream {
178 &mut self.inner
179 }
180
181 /// Consume the wrapper and return the inner stream.
182 pub fn into_inner(self) -> TcpStream {
183 self.inner
184 }
185}
186
187impl Transport for TcpTransport {
188 fn role(&self) -> ConnRole {
189 self.role
190 }
191
192 fn peer_addr(&self) -> Option<SocketAddr> {
193 self.inner.peer_addr().ok()
194 }
195}
196
197impl AsyncRead for TcpTransport {
198 fn poll_read(
199 mut self: Pin<&mut Self>,
200 cx: &mut Context<'_>,
201 buf: &mut ReadBuf<'_>,
202 ) -> Poll<io::Result<()>> {
203 Pin::new(&mut self.inner).poll_read(cx, buf)
204 }
205}
206
207impl AsyncWrite for TcpTransport {
208 fn poll_write(
209 mut self: Pin<&mut Self>,
210 cx: &mut Context<'_>,
211 buf: &[u8],
212 ) -> Poll<io::Result<usize>> {
213 Pin::new(&mut self.inner).poll_write(cx, buf)
214 }
215
216 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
217 Pin::new(&mut self.inner).poll_flush(cx)
218 }
219
220 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
221 Pin::new(&mut self.inner).poll_shutdown(cx)
222 }
223
224 fn poll_write_vectored(
225 mut self: Pin<&mut Self>,
226 cx: &mut Context<'_>,
227 bufs: &[io::IoSlice<'_>],
228 ) -> Poll<io::Result<usize>> {
229 Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
230 }
231
232 fn is_write_vectored(&self) -> bool {
233 self.inner.is_write_vectored()
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use tokio::io::{AsyncReadExt, AsyncWriteExt};
241 use tokio::net::{TcpListener, TcpStream};
242
243 #[test]
244 fn role_predicates() {
245 assert!(ConnRole::Proxy.is_listener());
246 assert!(ConnRole::DnodePeerProxy.is_listener());
247 assert!(!ConnRole::Server.is_listener());
248 assert!(ConnRole::DnodePeerServer.is_dnode_peer());
249 assert!(!ConnRole::Server.is_dnode_peer());
250 }
251
252 #[tokio::test]
253 async fn tcp_transport_round_trip() {
254 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
255 let addr = listener.local_addr().unwrap();
256 let server = tokio::spawn(async move {
257 let (s, _) = listener.accept().await.unwrap();
258 let mut t = TcpTransport::new(s, ConnRole::Server);
259 assert_eq!(t.role(), ConnRole::Server);
260 let mut buf = [0u8; 4];
261 t.read_exact(&mut buf).await.unwrap();
262 t.write_all(&buf).await.unwrap();
263 });
264 let sock = TcpStream::connect(addr).await.unwrap();
265 let mut client = TcpTransport::new(sock, ConnRole::Client);
266 assert_eq!(client.role(), ConnRole::Client);
267 assert!(client.peer_addr().is_some());
268 client.write_all(b"ping").await.unwrap();
269 let mut out = [0u8; 4];
270 client.read_exact(&mut out).await.unwrap();
271 assert_eq!(&out, b"ping");
272 server.await.unwrap();
273 }
274}