Skip to main content

dynomite/net/
client.rs

1//! CLIENT-role connection driver.
2//!
3//! The CLIENT role is the engine's view of an inbound connection
4//! from a Redis or memcache client. The driver:
5//!
6//! 1. Reads bytes from the [`crate::io::reactor::Transport`] into
7//!    the connection's recv mbuf chain.
8//! 2. Drives the appropriate datastore parser
9//!    ([`crate::proto::redis::redis_parse_req`] or
10//!    [`crate::proto::memcache::memcache_parse_req`]) over the
11//!    chain.
12//! 3. Hands every fully-parsed request to the configured
13//!    [`crate::net::Dispatcher`] and waits for a response on the
14//!    per-connection mpsc channel.
15//! 4. Writes the response bytes back to the transport.
16//!
17//! The driver runs a single tokio `select!` per iteration, draining
18//! pending response bytes first so the loop's read / write
19//! arms never block on a saturated peer.
20//!
21//! The Stage 9 implementation does not yet reach into the cluster
22//! layer (Stage 10) or the entropy reconciliation (Stage 11);
23//! those plug in through the [`Dispatcher`] hook the proxy
24//! installs.
25
26use std::sync::Arc;
27use std::time::Duration;
28
29use tokio::io::{AsyncReadExt, AsyncWriteExt};
30use tokio::sync::mpsc;
31
32use crate::conf::DataStore;
33use crate::core::types::MsgId;
34use crate::io::reactor::ConnRole;
35use crate::msg::{response, Msg, MsgParseResult, MsgType};
36use crate::net::conn::Conn;
37use crate::net::dispatcher::{DispatchOutcome, Dispatcher, OutboundEnvelope};
38use crate::net::NetError;
39
40/// Stage-9 read buffer size for the client driver.
41///
42/// Sized to the typical Redis bulk header so inline GET/SET
43/// commands fit in a single read; larger payloads are appended
44/// across iterations.
45const CLIENT_READ_CHUNK: usize = 4096;
46
47/// Outcome reported by [`client_loop`] when it finishes.
48#[derive(Debug, Clone, Copy, Eq, PartialEq)]
49pub enum ClientLoopOutcome {
50    /// Peer closed (EOF) and queues drained cleanly.
51    Eof,
52    /// Driver was asked to exit by the dispatcher.
53    Cancelled,
54}
55
56/// Client-side request handler bundle.
57///
58/// Built by [`crate::net::proxy::Proxy`] and passed into
59/// [`client_loop`].
60pub struct ClientHandler {
61    dispatcher: Arc<dyn Dispatcher>,
62    response_tx: mpsc::Sender<OutboundEnvelope>,
63    data_store: DataStore,
64    next_msg_id: u64,
65    read_timeout: Option<Duration>,
66    gossip: Option<Arc<crate::cluster::gossip::GossipHandler>>,
67}
68
69impl ClientHandler {
70    /// Build a client handler.
71    ///
72    /// # Examples
73    ///
74    /// ```
75    /// use dynomite::conf::DataStore;
76    /// use dynomite::net::{ClientHandler, NoopDispatcher};
77    /// use std::sync::Arc;
78    /// use tokio::sync::mpsc;
79    /// let (tx, _rx) = mpsc::channel(1);
80    /// let _h = ClientHandler::new(Arc::new(NoopDispatcher), tx, DataStore::Redis);
81    /// ```
82    #[must_use]
83    pub fn new(
84        dispatcher: Arc<dyn Dispatcher>,
85        response_tx: mpsc::Sender<OutboundEnvelope>,
86        data_store: DataStore,
87    ) -> Self {
88        Self {
89            dispatcher,
90            response_tx,
91            data_store,
92            next_msg_id: 1,
93            read_timeout: None,
94            gossip: None,
95        }
96    }
97
98    /// Set the per-read timeout. None disables it.
99    #[must_use]
100    pub fn with_read_timeout(mut self, t: Option<Duration>) -> Self {
101        self.read_timeout = t;
102        self
103    }
104
105    /// Attach a gossip handler. Inbound peer connections served
106    /// through this handler dispatch gossip-class dnode frames
107    /// into the supplied handler instead of the datastore parser.
108    /// Data-plane connections (CLIENT role) leave it `None`.
109    #[must_use]
110    pub fn with_gossip(mut self, gossip: Arc<crate::cluster::gossip::GossipHandler>) -> Self {
111        self.gossip = Some(gossip);
112        self
113    }
114
115    /// Borrow the attached gossip handler, if any.
116    #[must_use]
117    pub fn gossip(&self) -> Option<&Arc<crate::cluster::gossip::GossipHandler>> {
118        self.gossip.as_ref()
119    }
120
121    /// Datastore the handler parses.
122    #[must_use]
123    pub fn data_store(&self) -> DataStore {
124        self.data_store
125    }
126
127    /// Borrow the dispatcher this handler routes parsed requests
128    /// into. Exposed so role-specific drivers (CLIENT,
129    /// DNODE_PEER_CLIENT) can share the same dispatch contract.
130    ///
131    /// # Examples
132    ///
133    /// ```
134    /// use dynomite::conf::DataStore;
135    /// use dynomite::net::{ClientHandler, NoopDispatcher};
136    /// use std::sync::Arc;
137    /// use tokio::sync::mpsc;
138    /// let (tx, _rx) = mpsc::channel(1);
139    /// let h = ClientHandler::new(Arc::new(NoopDispatcher), tx, DataStore::Redis);
140    /// let _ = h.dispatcher();
141    /// ```
142    #[must_use]
143    pub fn dispatcher(&self) -> &Arc<dyn Dispatcher> {
144        &self.dispatcher
145    }
146
147    /// Borrow the per-connection response sender. The dispatcher
148    /// uses a clone of this channel to push asynchronously-produced
149    /// responses back to the FSM.
150    ///
151    /// # Examples
152    ///
153    /// ```
154    /// use dynomite::conf::DataStore;
155    /// use dynomite::net::{ClientHandler, NoopDispatcher};
156    /// use std::sync::Arc;
157    /// use tokio::sync::mpsc;
158    /// let (tx, _rx) = mpsc::channel(1);
159    /// let h = ClientHandler::new(Arc::new(NoopDispatcher), tx, DataStore::Redis);
160    /// let _clone = h.response_tx().clone();
161    /// ```
162    #[must_use]
163    pub fn response_tx(&self) -> &mpsc::Sender<OutboundEnvelope> {
164        &self.response_tx
165    }
166
167    fn alloc_msg_id(&mut self) -> MsgId {
168        let id = self.next_msg_id;
169        self.next_msg_id = self.next_msg_id.wrapping_add(1).max(1);
170        id
171    }
172}
173
174/// Drive the client FSM until the peer closes or the dispatcher
175/// asks the driver to exit.
176///
177/// `rx` receives responses produced by the dispatcher; the driver
178/// writes the response bytes to the transport in the order it
179/// received them.
180///
181/// # Errors
182/// Any transport-level error is returned. Parse errors are surfaced
183/// as [`NetError::Parse`] and end the loop after sending a synthetic
184/// error response when possible.
185#[tracing::instrument(
186    name = "client_loop",
187    skip_all,
188    fields(
189        role = ?conn.role(),
190        peer = tracing::field::Empty,
191    ),
192)]
193pub async fn client_loop(
194    mut conn: Conn,
195    mut handler: ClientHandler,
196    mut rx: mpsc::Receiver<OutboundEnvelope>,
197) -> Result<(), NetError> {
198    debug_assert!(matches!(
199        conn.role(),
200        ConnRole::Client | ConnRole::DnodePeerClient
201    ));
202
203    let mut read_buf = vec![0u8; CLIENT_READ_CHUNK];
204    let mut accumulated: Vec<u8> = Vec::new();
205    let mut pending_writes: Vec<u8> = Vec::new();
206
207    loop {
208        // Flush any buffered response bytes first so the loop
209        // exit conditions never block on a full peer.
210        if !pending_writes.is_empty() {
211            let transport = conn.transport_mut().ok_or(NetError::Closed)?;
212            transport.write_all(&pending_writes).await?;
213            conn.record_send(pending_writes.len());
214            pending_writes.clear();
215        }
216
217        if conn.is_eof() && conn.imsg_q().is_empty() && conn.omsg_q().is_empty() {
218            conn.set_done();
219            return Ok(());
220        }
221
222        let read_fut = async {
223            let n = match conn.transport_mut() {
224                Some(t) => t.read(&mut read_buf).await,
225                None => return Ok::<usize, std::io::Error>(0),
226            };
227            n
228        };
229
230        tokio::select! {
231            res = read_fut => {
232                let n = res?;
233                if n == 0 {
234                    conn.set_eof();
235                    if conn.omsg_q().is_empty() {
236                        conn.set_done();
237                        return Ok(());
238                    }
239                    continue;
240                }
241                conn.record_recv(n);
242                accumulated.extend_from_slice(&read_buf[..n]);
243                drive_parser(&mut conn, &mut handler, &mut accumulated).await?;
244            }
245            Some(env) = rx.recv() => {
246                handle_response(&mut conn, &env, &mut pending_writes);
247            }
248            else => {
249                conn.set_done();
250                return Ok(());
251            }
252        }
253    }
254}
255
256#[tracing::instrument(
257    name = "client.parse_loop",
258    skip_all,
259    fields(accumulated = accumulated.len()),
260)]
261async fn drive_parser(
262    conn: &mut Conn,
263    handler: &mut ClientHandler,
264    accumulated: &mut Vec<u8>,
265) -> Result<(), NetError> {
266    use crate::proto::memcache::memcache_parse_req;
267    use crate::proto::redis::redis_parse_req;
268
269    while !accumulated.is_empty() {
270        let id = handler.alloc_msg_id();
271        let mut msg = Msg::new(id, MsgType::Unknown, true);
272        let consumed_before = msg.parser_pos();
273        let parse_result = match handler.data_store {
274            DataStore::Redis | DataStore::Noxu => redis_parse_req(&mut msg, accumulated),
275            DataStore::Memcache => memcache_parse_req(&mut msg, accumulated),
276        };
277        match parse_result {
278            MsgParseResult::Ok => {
279                let consumed = msg.parser_pos();
280                if consumed == 0 {
281                    return Err(NetError::Parse(
282                        "parser reported Ok with no bytes consumed".to_string(),
283                    ));
284                }
285                // Per-request span - one is created for every
286                // fully-parsed inbound message. Cross-task work
287                // (dispatch.plan, backend.send / parse, peer.send /
288                // parse, client.send) attaches as children via the
289                // captured `tracing::Span` on the OutboundRequest /
290                // OutboundEnvelope envelopes.
291                let req_span = tracing::info_span!(
292                    "client.parse",
293                    msg_id = msg.id(),
294                    msg_type = ?msg.ty(),
295                    bytes = consumed,
296                );
297                let was_quit = msg.flags().quit;
298                let quit_msg_id = if was_quit { Some(msg.id()) } else { None };
299                let inline_send: Option<OutboundEnvelope> = req_span.in_scope(|| {
300                    // Carry the consumed wire bytes inside the
301                    // msg so the dispatcher can forward them to a
302                    // backend without having to re-encode. The C
303                    // engine keeps the request bytes in the
304                    // inbound mbuf chain across recv -> filter
305                    // -> forward; the Rust port stores them on
306                    // the msg's own mbuf chain instead.
307                    let pool = conn.mbuf_pool().clone();
308                    let mut buf = pool.get();
309                    buf.recv(&accumulated[..consumed]);
310                    msg.mbufs_mut().push_back(buf);
311                    msg.recompute_mlen();
312                    accumulated.drain(0..consumed);
313                    let _ = consumed_before;
314                    conn.outstanding_mut().insert(msg.id(), msg.id());
315                    // The placeholder enqueue cannot fail under
316                    // normal operation; if it does we surface it
317                    // via the outer `?`.
318                    conn.enqueue_out(Msg::new(msg.id(), msg.ty(), true))
319                        .map_err(|e: NetError| e)?;
320                    let outcome = handler
321                        .dispatcher
322                        .dispatch(msg, handler.response_tx.clone());
323                    let inline = match outcome {
324                        DispatchOutcome::Pending | DispatchOutcome::Drop => None,
325                        DispatchOutcome::Inline(rsp) | DispatchOutcome::Error(rsp) => {
326                            Some(OutboundEnvelope {
327                                req_id: rsp.id(),
328                                rsp,
329                                span: tracing::Span::current(),
330                                source_peer_idx: None,
331                            })
332                        }
333                    };
334                    Ok::<Option<OutboundEnvelope>, NetError>(inline)
335                })?;
336                if let Some(env) = inline_send {
337                    let _ = handler.response_tx.send(env).await;
338                }
339                if let Some(qid) = quit_msg_id {
340                    // Real Redis replies `+OK\r\n` to QUIT and then
341                    // closes the client connection. Synthesize the
342                    // reply here (the dispatcher returned `Drop`
343                    // because there is no key to route) and send it
344                    // through the same `response_tx` used by every
345                    // other reply, which pops the placeholder we
346                    // pushed onto `omsg_q` above. Without this the
347                    // outer client loop's exit condition
348                    // (`omsg_q.is_empty()`) is never met and the
349                    // connection deadlocks until the kernel times
350                    // out the read.
351                    let pool = conn.mbuf_pool().clone();
352                    let mut anchor = Msg::new(qid, MsgType::ReqRedisQuit, true);
353                    anchor.set_parent_id(qid);
354                    let rsp = response::make_simple_redis(&anchor, &pool, b"+OK\r\n");
355                    let env = OutboundEnvelope {
356                        req_id: qid,
357                        rsp,
358                        span: req_span.clone(),
359                        source_peer_idx: None,
360                    };
361                    let _ = handler.response_tx.send(env).await;
362                    // Mirror the C engine: close after replying.
363                    conn.set_eof();
364                    return Ok(());
365                }
366            }
367            MsgParseResult::Again
368            | MsgParseResult::Repair
369            | MsgParseResult::Fragment
370            | MsgParseResult::Noop => {
371                let consumed = msg.parser_pos();
372                if consumed > 0 {
373                    accumulated.drain(0..consumed);
374                } else {
375                    return Ok(());
376                }
377            }
378            MsgParseResult::Error | MsgParseResult::OomError | MsgParseResult::DynoConfig => {
379                return Err(NetError::Parse(format!("{parse_result:?}")));
380            }
381        }
382    }
383    Ok(())
384}
385
386fn handle_response(conn: &mut Conn, env: &OutboundEnvelope, pending: &mut Vec<u8>) {
387    let _enter = env.span.enter();
388    let bytes_len: usize = env.rsp.mbufs().iter().map(|b| b.readable().len()).sum();
389    let _send_span =
390        tracing::info_span!("client.send", req_id = env.req_id, bytes = bytes_len,).entered();
391    for buf in env.rsp.mbufs() {
392        pending.extend_from_slice(buf.readable());
393    }
394    // Pop the matching outstanding entry.
395    conn.outstanding_mut().remove(&env.req_id);
396    // Pop the placeholder we pushed on enqueue.
397    if let Some(front) = conn.omsg_q_mut().front() {
398        if front.id() == env.req_id {
399            let _ = conn.omsg_q_mut().pop_front();
400        }
401    }
402}
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407
408    #[test]
409    fn alloc_msg_id_is_monotonic() {
410        let (tx, _rx) = mpsc::channel(1);
411        let mut h = ClientHandler::new(Arc::new(crate::net::NoopDispatcher), tx, DataStore::Redis);
412        let a = h.alloc_msg_id();
413        let b = h.alloc_msg_id();
414        assert_eq!(a + 1, b);
415    }
416}