Skip to main content

dynomite/net/
conn.rs

1//! Per-connection state and the shared event loop.
2//!
3//! [`Conn`] is the Rust counterpart of the C engine's `struct conn`.
4//! It owns:
5//!
6//! * the [`Transport`] the connection reads and writes through,
7//! * the recv mbuf chain ([`MbufQueue`]) the parser drains,
8//! * the send mbuf chain the writer drains,
9//! * the in-queue (`imsg_q`) of incoming requests waiting to be
10//!   forwarded,
11//! * the out-queue (`omsg_q`) of outstanding requests awaiting
12//!   responses,
13//! * the parser scratch state for partial messages,
14//! * counters and lifecycle bits the FSM sets and reads.
15//!
16//! The role-specific behavior (PROXY accepts, CLIENT parses
17//! datastore requests, SERVER pumps to the backend, etc.) lives in
18//! the sibling modules; [`Conn`] hosts the shared data shape and a
19//! small set of top-level lifecycle methods so multiple roles can
20//! reuse the same skeleton.
21//!
22//! # Examples
23//!
24//! ```no_run
25//! use dynomite::io::reactor::{ConnRole, TcpTransport};
26//! use dynomite::net::Conn;
27//! use tokio::net::TcpStream;
28//! # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
29//! let stream = TcpStream::connect("127.0.0.1:6379").await.unwrap();
30//! let transport = Box::new(TcpTransport::new(stream, ConnRole::Server));
31//! let mut conn = Conn::new(transport, ConnRole::Server);
32//! assert_eq!(conn.role(), ConnRole::Server);
33//! conn.close();
34//! # });
35//! ```
36
37use std::collections::HashMap;
38use std::net::SocketAddr;
39use std::sync::atomic::{AtomicU64, Ordering};
40
41use crate::core::types::MsgId;
42use crate::io::mbuf::{MbufPool, MbufQueue};
43use crate::io::reactor::{ConnRole, Transport};
44use crate::msg::{ConsistencyLevel, Msg, MsgQueue};
45
46use super::NetError;
47
48/// Maximum queue depth before back-pressure kicks in.
49///
50/// Mirrors the reference `MAX_CONN_QUEUE_SIZE` constant in
51/// `dyn_connection.h`. The C engine logs and rejects on overflow;
52/// the Rust port surfaces the same behavior through
53/// [`Conn::enqueue_in`] / [`Conn::enqueue_out`] which return
54/// [`NetError::PoolExhausted`] when the cap is reached.
55pub const MAX_CONN_QUEUE_SIZE: usize = 20_000;
56
57/// Lightweight rolling counters carried by every [`Conn`].
58///
59/// Mirrors the per-connection byte counters and event totals that
60/// the C engine carries on `struct conn` (`recv_bytes`, `send_bytes`,
61/// `events`).
62#[derive(Debug, Default, Clone)]
63pub struct ConnStats {
64    /// Bytes successfully read into the recv mbuf chain.
65    pub recv_bytes: u64,
66    /// Bytes successfully written from the send mbuf chain.
67    pub send_bytes: u64,
68    /// Number of messages enqueued onto `imsg_q`.
69    pub recv_msgs: u64,
70    /// Number of messages enqueued onto `omsg_q`.
71    pub send_msgs: u64,
72    /// Number of times the read path completed.
73    pub recv_events: u64,
74    /// Number of times the write path completed.
75    pub send_events: u64,
76}
77
78/// Stable, process-unique connection identifier.
79///
80/// Connections are identified by a monotonic 64-bit counter so the
81/// id stays unique across reconnects and across transports (TCP,
82/// QUIC).
83#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
84pub struct ConnHandle(u64);
85
86impl ConnHandle {
87    /// Borrow the raw 64-bit id.
88    ///
89    /// # Examples
90    /// ```
91    /// # use dynomite::io::reactor::{ConnRole, TcpTransport};
92    /// # use dynomite::net::Conn;
93    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
94    /// let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
95    /// let addr = listener.local_addr().unwrap();
96    /// let _accept = tokio::spawn(async move {
97    ///     let (s, _) = listener.accept().await.unwrap();
98    ///     drop(s);
99    /// });
100    /// let s = tokio::net::TcpStream::connect(addr).await.unwrap();
101    /// let transport = Box::new(TcpTransport::new(s, ConnRole::Server));
102    /// let conn = Conn::new(transport, ConnRole::Server);
103    /// assert!(conn.handle().raw() > 0);
104    /// # });
105    /// ```
106    #[must_use]
107    pub fn raw(self) -> u64 {
108        self.0
109    }
110}
111
112static NEXT_HANDLE: AtomicU64 = AtomicU64::new(1);
113
114fn next_handle() -> ConnHandle {
115    ConnHandle(NEXT_HANDLE.fetch_add(1, Ordering::Relaxed))
116}
117
118/// Connection state shared across role-specific FSMs.
119#[allow(clippy::struct_excessive_bools)]
120pub struct Conn {
121    handle: ConnHandle,
122    role: ConnRole,
123    transport: Option<Box<dyn Transport>>,
124    peer_addr: Option<SocketAddr>,
125    recv: MbufQueue,
126    send: MbufQueue,
127    imsg_q: MsgQueue,
128    omsg_q: MsgQueue,
129    rmsg: Option<Msg>,
130    smsg: Option<Msg>,
131    stats: ConnStats,
132    eof: bool,
133    done: bool,
134    err: Option<String>,
135    read_consistency: ConsistencyLevel,
136    write_consistency: ConsistencyLevel,
137    same_dc: bool,
138    dyn_mode: bool,
139    dnode_secured: bool,
140    crypto_key_sent: bool,
141    aes_key: Option<[u8; 32]>,
142    outstanding: HashMap<MsgId, MsgId>,
143    pool: MbufPool,
144}
145
146impl std::fmt::Debug for Conn {
147    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148        // Lifecycle bits and counters - the transport handle and
149        // the mbuf pool intentionally render as opaque.
150        let _ = (
151            &self.transport,
152            &self.read_consistency,
153            &self.write_consistency,
154            &self.same_dc,
155            &self.dyn_mode,
156            &self.dnode_secured,
157            &self.crypto_key_sent,
158            &self.aes_key,
159            &self.outstanding,
160            &self.pool,
161            &self.stats,
162            &self.rmsg,
163            &self.smsg,
164        );
165        f.debug_struct("Conn")
166            .field("handle", &self.handle)
167            .field("role", &self.role)
168            .field("peer_addr", &self.peer_addr)
169            .field("recv_chain", &self.recv.len())
170            .field("send_chain", &self.send.len())
171            .field("imsg_q", &self.imsg_q.len())
172            .field("omsg_q", &self.omsg_q.len())
173            .field("recv_bytes", &self.stats.recv_bytes)
174            .field("send_bytes", &self.stats.send_bytes)
175            .field("eof", &self.eof)
176            .field("done", &self.done)
177            .field("err", &self.err)
178            .field("read_consistency", &self.read_consistency)
179            .field("write_consistency", &self.write_consistency)
180            .field("same_dc", &self.same_dc)
181            .field("dyn_mode", &self.dyn_mode)
182            .field("dnode_secured", &self.dnode_secured)
183            .field("crypto_key_sent", &self.crypto_key_sent)
184            .field("aes_key_set", &self.aes_key.is_some())
185            .field("outstanding", &self.outstanding.len())
186            .finish()
187    }
188}
189
190impl Conn {
191    /// Build a new connection wrapping `transport` and tagged with
192    /// `role`.
193    ///
194    /// The connection starts with empty mbuf chains, no in-flight
195    /// messages, and the default consistency knobs (`DcOne`).
196    /// Role-specific drivers in this module set extra fields (the
197    /// dnode peer paths set `same_dc`, `dyn_mode`, etc.) before
198    /// invoking [`Conn::run`].
199    ///
200    /// # Examples
201    /// ```no_run
202    /// use dynomite::io::reactor::{ConnRole, TcpTransport};
203    /// use dynomite::net::Conn;
204    /// use tokio::net::TcpStream;
205    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
206    /// let s = TcpStream::connect("127.0.0.1:6379").await.unwrap();
207    /// let _ = Conn::new(Box::new(TcpTransport::new(s, ConnRole::Server)), ConnRole::Server);
208    /// # });
209    /// ```
210    pub fn new(transport: Box<dyn Transport>, role: ConnRole) -> Self {
211        let peer_addr = transport.peer_addr();
212        Self {
213            handle: next_handle(),
214            role,
215            transport: Some(transport),
216            peer_addr,
217            recv: MbufQueue::new(),
218            send: MbufQueue::new(),
219            imsg_q: MsgQueue::new(),
220            omsg_q: MsgQueue::new(),
221            rmsg: None,
222            smsg: None,
223            stats: ConnStats::default(),
224            eof: false,
225            done: false,
226            err: None,
227            read_consistency: ConsistencyLevel::DcOne,
228            write_consistency: ConsistencyLevel::DcOne,
229            same_dc: true,
230            dyn_mode: matches!(
231                role,
232                ConnRole::DnodePeerProxy | ConnRole::DnodePeerClient | ConnRole::DnodePeerServer
233            ),
234            dnode_secured: false,
235            crypto_key_sent: false,
236            aes_key: None,
237            outstanding: HashMap::new(),
238            pool: MbufPool::default(),
239        }
240    }
241
242    /// Stable connection handle.
243    #[must_use]
244    pub fn handle(&self) -> ConnHandle {
245        self.handle
246    }
247
248    /// Connection role.
249    #[must_use]
250    pub fn role(&self) -> ConnRole {
251        self.role
252    }
253
254    /// Remote address, if the transport could surface one.
255    #[must_use]
256    pub fn peer_addr(&self) -> Option<SocketAddr> {
257        self.peer_addr
258    }
259
260    /// Borrow the rolling counters.
261    #[must_use]
262    pub fn stats(&self) -> &ConnStats {
263        &self.stats
264    }
265
266    /// Borrow the recv mbuf chain.
267    #[must_use]
268    pub fn recv_chain(&self) -> &MbufQueue {
269        &self.recv
270    }
271
272    /// Mutably borrow the recv mbuf chain.
273    pub fn recv_chain_mut(&mut self) -> &mut MbufQueue {
274        &mut self.recv
275    }
276
277    /// Borrow the send mbuf chain.
278    #[must_use]
279    pub fn send_chain(&self) -> &MbufQueue {
280        &self.send
281    }
282
283    /// Mutably borrow the send mbuf chain.
284    pub fn send_chain_mut(&mut self) -> &mut MbufQueue {
285        &mut self.send
286    }
287
288    /// Borrow the in-queue.
289    #[must_use]
290    pub fn imsg_q(&self) -> &MsgQueue {
291        &self.imsg_q
292    }
293
294    /// Mutably borrow the in-queue.
295    pub fn imsg_q_mut(&mut self) -> &mut MsgQueue {
296        &mut self.imsg_q
297    }
298
299    /// Borrow the out-queue.
300    #[must_use]
301    pub fn omsg_q(&self) -> &MsgQueue {
302        &self.omsg_q
303    }
304
305    /// Mutably borrow the out-queue.
306    pub fn omsg_q_mut(&mut self) -> &mut MsgQueue {
307        &mut self.omsg_q
308    }
309
310    /// Borrow the partially-received message, if any.
311    #[must_use]
312    pub fn rmsg(&self) -> Option<&Msg> {
313        self.rmsg.as_ref()
314    }
315
316    /// Mutably borrow the partially-received message.
317    pub fn rmsg_mut(&mut self) -> Option<&mut Msg> {
318        self.rmsg.as_mut()
319    }
320
321    /// Take ownership of the partially-received message, leaving
322    /// the slot empty.
323    pub fn take_rmsg(&mut self) -> Option<Msg> {
324        self.rmsg.take()
325    }
326
327    /// Install the partially-received message slot.
328    pub fn set_rmsg(&mut self, msg: Option<Msg>) {
329        self.rmsg = msg;
330    }
331
332    /// Borrow the partially-sent message, if any.
333    #[must_use]
334    pub fn smsg(&self) -> Option<&Msg> {
335        self.smsg.as_ref()
336    }
337
338    /// Take the partially-sent message.
339    pub fn take_smsg(&mut self) -> Option<Msg> {
340        self.smsg.take()
341    }
342
343    /// Install the partially-sent message slot.
344    pub fn set_smsg(&mut self, msg: Option<Msg>) {
345        self.smsg = msg;
346    }
347
348    /// True when the peer has closed its half of the connection.
349    #[must_use]
350    pub fn is_eof(&self) -> bool {
351        self.eof
352    }
353
354    /// Mark the peer half as closed.
355    pub fn set_eof(&mut self) {
356        self.eof = true;
357    }
358
359    /// True when the connection has finished (either side closed
360    /// and all queued work drained).
361    #[must_use]
362    pub fn is_done(&self) -> bool {
363        self.done
364    }
365
366    /// Mark the connection as done.
367    pub fn set_done(&mut self) {
368        self.done = true;
369    }
370
371    /// Last recorded error, if any.
372    #[must_use]
373    pub fn err(&self) -> Option<&str> {
374        self.err.as_deref()
375    }
376
377    /// Record a transport-level error string.
378    pub fn set_err<S: Into<String>>(&mut self, msg: S) {
379        self.err = Some(msg.into());
380    }
381
382    /// Read consistency level applied to incoming reads on this
383    /// connection.
384    #[must_use]
385    pub fn read_consistency(&self) -> ConsistencyLevel {
386        self.read_consistency
387    }
388
389    /// Write consistency level applied to incoming writes on this
390    /// connection.
391    #[must_use]
392    pub fn write_consistency(&self) -> ConsistencyLevel {
393        self.write_consistency
394    }
395
396    /// Update the read consistency level.
397    pub fn set_read_consistency(&mut self, c: ConsistencyLevel) {
398        self.read_consistency = c;
399    }
400
401    /// Update the write consistency level.
402    pub fn set_write_consistency(&mut self, c: ConsistencyLevel) {
403        self.write_consistency = c;
404    }
405
406    /// True for peer connections inside the local datacenter.
407    #[must_use]
408    pub fn same_dc(&self) -> bool {
409        self.same_dc
410    }
411
412    /// Set the same-DC bit. The dnode-peer drivers update this
413    /// after consulting the snitch.
414    pub fn set_same_dc(&mut self, on: bool) {
415        self.same_dc = on;
416    }
417
418    /// True when the connection participates in the dnode peer
419    /// plane.
420    #[must_use]
421    pub fn dyn_mode(&self) -> bool {
422        self.dyn_mode
423    }
424
425    /// True when the dnode handshake exchanged a per-connection
426    /// AES key.
427    #[must_use]
428    pub fn dnode_secured(&self) -> bool {
429        self.dnode_secured
430    }
431
432    /// Mark the connection as secured (the handshake completed).
433    pub fn set_dnode_secured(&mut self, on: bool) {
434        self.dnode_secured = on;
435    }
436
437    /// True when the local end has emitted the encrypted AES key.
438    #[must_use]
439    pub fn crypto_key_sent(&self) -> bool {
440        self.crypto_key_sent
441    }
442
443    /// Mark the AES key as sent.
444    pub fn set_crypto_key_sent(&mut self, on: bool) {
445        self.crypto_key_sent = on;
446    }
447
448    /// Return the per-connection AES key, if one has been
449    /// installed.
450    #[must_use]
451    pub fn aes_key(&self) -> Option<&[u8; 32]> {
452        self.aes_key.as_ref()
453    }
454
455    /// Install (or replace) the per-connection AES key.
456    pub fn set_aes_key(&mut self, key: [u8; 32]) {
457        self.aes_key = Some(key);
458    }
459
460    /// Borrow the outstanding-msg map.
461    #[must_use]
462    pub fn outstanding(&self) -> &HashMap<MsgId, MsgId> {
463        &self.outstanding
464    }
465
466    /// Mutably borrow the outstanding-msg map.
467    pub fn outstanding_mut(&mut self) -> &mut HashMap<MsgId, MsgId> {
468        &mut self.outstanding
469    }
470
471    /// Borrow the pool used for fresh mbuf chunks.
472    #[must_use]
473    pub fn mbuf_pool(&self) -> &MbufPool {
474        &self.pool
475    }
476
477    /// Replace the mbuf pool. Useful when the embedding application
478    /// wants every connection to share a single pool.
479    pub fn set_mbuf_pool(&mut self, pool: MbufPool) {
480        self.pool = pool;
481    }
482
483    /// Take ownership of the underlying transport, leaving the
484    /// connection in a half-closed state. Returns `None` if the
485    /// transport has already been moved out (typically by
486    /// [`Conn::run`]).
487    pub fn take_transport(&mut self) -> Option<Box<dyn Transport>> {
488        self.transport.take()
489    }
490
491    /// Reinstall a transport. Used by reconnect logic in
492    /// [`crate::net::ConnPool`].
493    pub fn set_transport(&mut self, transport: Box<dyn Transport>) {
494        self.peer_addr = transport.peer_addr();
495        self.transport = Some(transport);
496    }
497
498    /// True when a transport is currently installed.
499    #[must_use]
500    pub fn has_transport(&self) -> bool {
501        self.transport.is_some()
502    }
503
504    /// Mutably borrow the transport. Driver loops use this to drive
505    /// `AsyncRead` / `AsyncWrite` directly.
506    pub fn transport_mut(&mut self) -> Option<&mut Box<dyn Transport>> {
507        self.transport.as_mut()
508    }
509
510    /// Enqueue a request onto the in-queue.
511    ///
512    /// # Errors
513    /// Returns [`NetError::PoolExhausted`] when the queue is at
514    /// [`MAX_CONN_QUEUE_SIZE`].
515    pub fn enqueue_in(&mut self, msg: Msg) -> Result<(), NetError> {
516        if self.imsg_q.len() >= MAX_CONN_QUEUE_SIZE {
517            return Err(NetError::PoolExhausted);
518        }
519        self.imsg_q.push_back(msg);
520        self.stats.recv_msgs += 1;
521        Ok(())
522    }
523
524    /// Enqueue a message onto the out-queue.
525    ///
526    /// # Errors
527    /// Returns [`NetError::PoolExhausted`] when the queue is at
528    /// [`MAX_CONN_QUEUE_SIZE`].
529    pub fn enqueue_out(&mut self, msg: Msg) -> Result<(), NetError> {
530        if self.omsg_q.len() >= MAX_CONN_QUEUE_SIZE {
531            return Err(NetError::PoolExhausted);
532        }
533        self.omsg_q.push_back(msg);
534        self.stats.send_msgs += 1;
535        Ok(())
536    }
537
538    /// Drop the transport and mark the connection as done.
539    pub fn close(&mut self) {
540        self.transport = None;
541        self.done = true;
542    }
543
544    /// Idle no-op driver hook.
545    ///
546    /// Stage 9 wires the PROXY / CLIENT / SERVER / DNODE_PEER_*
547    /// roles into dedicated drivers in the sibling modules. Calling
548    /// `run` on a [`Conn`] without first installing a driver does
549    /// nothing: the connection idles until [`Conn::close`] is
550    /// invoked. Real drivers (for example [`super::client::client_loop`])
551    /// take a `&mut Conn` directly.
552    ///
553    /// # Errors
554    ///
555    /// Returns [`NetError::Closed`] when the transport has already
556    /// been moved out, e.g. by an earlier driver.
557    pub fn run(&mut self) -> Result<(), NetError> {
558        if self.transport.is_none() {
559            return Err(NetError::Closed);
560        }
561        if self.done {
562            return Ok(());
563        }
564        Ok(())
565    }
566
567    /// Bump the recv-bytes counter.
568    pub fn record_recv(&mut self, bytes: usize) {
569        self.stats.recv_bytes += bytes as u64;
570        self.stats.recv_events += 1;
571    }
572
573    /// Bump the send-bytes counter.
574    pub fn record_send(&mut self, bytes: usize) {
575        self.stats.send_bytes += bytes as u64;
576        self.stats.send_events += 1;
577    }
578}
579
580#[cfg(test)]
581mod tests {
582    use super::*;
583    use crate::io::reactor::TcpTransport;
584    use crate::msg::MsgType;
585    use tokio::net::{TcpListener, TcpStream};
586
587    async fn pair() -> (Conn, Conn) {
588        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
589        let addr = listener.local_addr().unwrap();
590        let accept = tokio::spawn(async move {
591            let (s, _) = listener.accept().await.unwrap();
592            s
593        });
594        let client = TcpStream::connect(addr).await.unwrap();
595        let server = accept.await.unwrap();
596        let c = Conn::new(
597            Box::new(TcpTransport::new(client, ConnRole::Client)),
598            ConnRole::Client,
599        );
600        let s = Conn::new(
601            Box::new(TcpTransport::new(server, ConnRole::Server)),
602            ConnRole::Server,
603        );
604        (c, s)
605    }
606
607    #[tokio::test]
608    async fn enqueue_in_and_out() {
609        let (mut c, _s) = pair().await;
610        c.enqueue_in(Msg::new(1, MsgType::ReqRedisGet, true))
611            .unwrap();
612        c.enqueue_out(Msg::new(2, MsgType::RspRedisStatus, false))
613            .unwrap();
614        assert_eq!(c.imsg_q().len(), 1);
615        assert_eq!(c.omsg_q().len(), 1);
616        assert_eq!(c.stats().recv_msgs, 1);
617        assert_eq!(c.stats().send_msgs, 1);
618    }
619
620    #[tokio::test]
621    async fn close_drops_transport() {
622        let (mut c, _s) = pair().await;
623        assert!(c.has_transport());
624        c.close();
625        assert!(!c.has_transport());
626        assert!(c.is_done());
627    }
628
629    #[tokio::test]
630    async fn handle_is_unique() {
631        let (a, b) = pair().await;
632        assert_ne!(a.handle(), b.handle());
633    }
634
635    #[tokio::test]
636    async fn role_seed_drives_dyn_mode() {
637        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
638        let addr = listener.local_addr().unwrap();
639        let _accept = tokio::spawn(async move {
640            let (s, _) = listener.accept().await.unwrap();
641            drop(s);
642        });
643        let s = TcpStream::connect(addr).await.unwrap();
644        let c = Conn::new(
645            Box::new(TcpTransport::new(s, ConnRole::DnodePeerServer)),
646            ConnRole::DnodePeerServer,
647        );
648        assert!(c.dyn_mode());
649        assert!(c.same_dc());
650    }
651}