dynomite/net/client.rs
1//! CLIENT-role connection driver.
2//!
3//! The CLIENT role is the engine's view of an inbound connection
4//! from a Redis or memcache client. The driver:
5//!
6//! 1. Reads bytes from the [`crate::io::reactor::Transport`] into
7//! the connection's recv mbuf chain.
8//! 2. Drives the appropriate datastore parser
9//! ([`crate::proto::redis::redis_parse_req`] or
10//! [`crate::proto::memcache::memcache_parse_req`]) over the
11//! chain.
12//! 3. Hands every fully-parsed request to the configured
13//! [`crate::net::Dispatcher`] and waits for a response on the
14//! per-connection mpsc channel.
15//! 4. Writes the response bytes back to the transport.
16//!
17//! The driver runs a single tokio `select!` per iteration, draining
18//! pending response bytes first so the loop's read / write
19//! arms never block on a saturated peer.
20//!
21//! The Stage 9 implementation does not yet reach into the cluster
22//! layer (Stage 10) or the entropy reconciliation (Stage 11);
23//! those plug in through the [`Dispatcher`] hook the proxy
24//! installs.
25
26use std::sync::Arc;
27use std::time::Duration;
28
29use tokio::io::{AsyncReadExt, AsyncWriteExt};
30use tokio::sync::mpsc;
31
32use crate::conf::DataStore;
33use crate::core::types::MsgId;
34use crate::io::reactor::ConnRole;
35use crate::msg::{response, Msg, MsgParseResult, MsgType};
36use crate::net::conn::Conn;
37use crate::net::dispatcher::{DispatchOutcome, Dispatcher, OutboundEnvelope};
38use crate::net::NetError;
39
40/// Stage-9 read buffer size for the client driver.
41///
42/// Sized to the typical Redis bulk header so inline GET/SET
43/// commands fit in a single read; larger payloads are appended
44/// across iterations.
45const CLIENT_READ_CHUNK: usize = 4096;
46
47/// Outcome reported by [`client_loop`] when it finishes.
48#[derive(Debug, Clone, Copy, Eq, PartialEq)]
49pub enum ClientLoopOutcome {
50 /// Peer closed (EOF) and queues drained cleanly.
51 Eof,
52 /// Driver was asked to exit by the dispatcher.
53 Cancelled,
54}
55
56/// Client-side request handler bundle.
57///
58/// Built by [`crate::net::proxy::Proxy`] and passed into
59/// [`client_loop`].
60pub struct ClientHandler {
61 dispatcher: Arc<dyn Dispatcher>,
62 response_tx: mpsc::Sender<OutboundEnvelope>,
63 data_store: DataStore,
64 next_msg_id: u64,
65 read_timeout: Option<Duration>,
66 gossip: Option<Arc<crate::cluster::gossip::GossipHandler>>,
67}
68
69impl ClientHandler {
70 /// Build a client handler.
71 ///
72 /// # Examples
73 ///
74 /// ```
75 /// use dynomite::conf::DataStore;
76 /// use dynomite::net::{ClientHandler, NoopDispatcher};
77 /// use std::sync::Arc;
78 /// use tokio::sync::mpsc;
79 /// let (tx, _rx) = mpsc::channel(1);
80 /// let _h = ClientHandler::new(Arc::new(NoopDispatcher), tx, DataStore::Redis);
81 /// ```
82 #[must_use]
83 pub fn new(
84 dispatcher: Arc<dyn Dispatcher>,
85 response_tx: mpsc::Sender<OutboundEnvelope>,
86 data_store: DataStore,
87 ) -> Self {
88 Self {
89 dispatcher,
90 response_tx,
91 data_store,
92 next_msg_id: 1,
93 read_timeout: None,
94 gossip: None,
95 }
96 }
97
98 /// Set the per-read timeout. None disables it.
99 #[must_use]
100 pub fn with_read_timeout(mut self, t: Option<Duration>) -> Self {
101 self.read_timeout = t;
102 self
103 }
104
105 /// Attach a gossip handler. Inbound peer connections served
106 /// through this handler dispatch gossip-class dnode frames
107 /// into the supplied handler instead of the datastore parser.
108 /// Data-plane connections (CLIENT role) leave it `None`.
109 #[must_use]
110 pub fn with_gossip(mut self, gossip: Arc<crate::cluster::gossip::GossipHandler>) -> Self {
111 self.gossip = Some(gossip);
112 self
113 }
114
115 /// Borrow the attached gossip handler, if any.
116 #[must_use]
117 pub fn gossip(&self) -> Option<&Arc<crate::cluster::gossip::GossipHandler>> {
118 self.gossip.as_ref()
119 }
120
121 /// Datastore the handler parses.
122 #[must_use]
123 pub fn data_store(&self) -> DataStore {
124 self.data_store
125 }
126
127 /// Borrow the dispatcher this handler routes parsed requests
128 /// into. Exposed so role-specific drivers (CLIENT,
129 /// DNODE_PEER_CLIENT) can share the same dispatch contract.
130 ///
131 /// # Examples
132 ///
133 /// ```
134 /// use dynomite::conf::DataStore;
135 /// use dynomite::net::{ClientHandler, NoopDispatcher};
136 /// use std::sync::Arc;
137 /// use tokio::sync::mpsc;
138 /// let (tx, _rx) = mpsc::channel(1);
139 /// let h = ClientHandler::new(Arc::new(NoopDispatcher), tx, DataStore::Redis);
140 /// let _ = h.dispatcher();
141 /// ```
142 #[must_use]
143 pub fn dispatcher(&self) -> &Arc<dyn Dispatcher> {
144 &self.dispatcher
145 }
146
147 /// Borrow the per-connection response sender. The dispatcher
148 /// uses a clone of this channel to push asynchronously-produced
149 /// responses back to the FSM.
150 ///
151 /// # Examples
152 ///
153 /// ```
154 /// use dynomite::conf::DataStore;
155 /// use dynomite::net::{ClientHandler, NoopDispatcher};
156 /// use std::sync::Arc;
157 /// use tokio::sync::mpsc;
158 /// let (tx, _rx) = mpsc::channel(1);
159 /// let h = ClientHandler::new(Arc::new(NoopDispatcher), tx, DataStore::Redis);
160 /// let _clone = h.response_tx().clone();
161 /// ```
162 #[must_use]
163 pub fn response_tx(&self) -> &mpsc::Sender<OutboundEnvelope> {
164 &self.response_tx
165 }
166
167 fn alloc_msg_id(&mut self) -> MsgId {
168 let id = self.next_msg_id;
169 self.next_msg_id = self.next_msg_id.wrapping_add(1).max(1);
170 id
171 }
172}
173
174/// Drive the client FSM until the peer closes or the dispatcher
175/// asks the driver to exit.
176///
177/// `rx` receives responses produced by the dispatcher; the driver
178/// writes the response bytes to the transport in the order it
179/// received them.
180///
181/// # Errors
182/// Any transport-level error is returned. Parse errors are surfaced
183/// as [`NetError::Parse`] and end the loop after sending a synthetic
184/// error response when possible.
185#[tracing::instrument(
186 name = "client_loop",
187 skip_all,
188 fields(
189 role = ?conn.role(),
190 peer = tracing::field::Empty,
191 ),
192)]
193pub async fn client_loop(
194 mut conn: Conn,
195 mut handler: ClientHandler,
196 mut rx: mpsc::Receiver<OutboundEnvelope>,
197) -> Result<(), NetError> {
198 debug_assert!(matches!(
199 conn.role(),
200 ConnRole::Client | ConnRole::DnodePeerClient
201 ));
202
203 let mut read_buf = vec![0u8; CLIENT_READ_CHUNK];
204 let mut accumulated: Vec<u8> = Vec::new();
205 let mut pending_writes: Vec<u8> = Vec::new();
206
207 loop {
208 // Flush any buffered response bytes first so the loop
209 // exit conditions never block on a full peer.
210 if !pending_writes.is_empty() {
211 let transport = conn.transport_mut().ok_or(NetError::Closed)?;
212 transport.write_all(&pending_writes).await?;
213 conn.record_send(pending_writes.len());
214 pending_writes.clear();
215 }
216
217 if conn.is_eof() && conn.imsg_q().is_empty() && conn.omsg_q().is_empty() {
218 conn.set_done();
219 return Ok(());
220 }
221
222 let read_fut = async {
223 let n = match conn.transport_mut() {
224 Some(t) => t.read(&mut read_buf).await,
225 None => return Ok::<usize, std::io::Error>(0),
226 };
227 n
228 };
229
230 tokio::select! {
231 res = read_fut => {
232 let n = res?;
233 if n == 0 {
234 conn.set_eof();
235 if conn.omsg_q().is_empty() {
236 conn.set_done();
237 return Ok(());
238 }
239 continue;
240 }
241 conn.record_recv(n);
242 accumulated.extend_from_slice(&read_buf[..n]);
243 drive_parser(&mut conn, &mut handler, &mut accumulated).await?;
244 }
245 Some(env) = rx.recv() => {
246 handle_response(&mut conn, &env, &mut pending_writes);
247 }
248 else => {
249 conn.set_done();
250 return Ok(());
251 }
252 }
253 }
254}
255
256#[tracing::instrument(
257 name = "client.parse_loop",
258 skip_all,
259 fields(accumulated = accumulated.len()),
260)]
261async fn drive_parser(
262 conn: &mut Conn,
263 handler: &mut ClientHandler,
264 accumulated: &mut Vec<u8>,
265) -> Result<(), NetError> {
266 use crate::proto::memcache::memcache_parse_req;
267 use crate::proto::redis::redis_parse_req;
268
269 while !accumulated.is_empty() {
270 let id = handler.alloc_msg_id();
271 let mut msg = Msg::new(id, MsgType::Unknown, true);
272 let consumed_before = msg.parser_pos();
273 let parse_result = match handler.data_store {
274 DataStore::Redis | DataStore::Noxu => redis_parse_req(&mut msg, accumulated),
275 DataStore::Memcache => memcache_parse_req(&mut msg, accumulated),
276 };
277 match parse_result {
278 MsgParseResult::Ok => {
279 let consumed = msg.parser_pos();
280 if consumed == 0 {
281 return Err(NetError::Parse(
282 "parser reported Ok with no bytes consumed".to_string(),
283 ));
284 }
285 // Per-request span - one is created for every
286 // fully-parsed inbound message. Cross-task work
287 // (dispatch.plan, backend.send / parse, peer.send /
288 // parse, client.send) attaches as children via the
289 // captured `tracing::Span` on the OutboundRequest /
290 // OutboundEnvelope envelopes.
291 let req_span = tracing::info_span!(
292 "client.parse",
293 msg_id = msg.id(),
294 msg_type = ?msg.ty(),
295 bytes = consumed,
296 );
297 let was_quit = msg.flags().quit;
298 let quit_msg_id = if was_quit { Some(msg.id()) } else { None };
299 let inline_send: Option<OutboundEnvelope> = req_span.in_scope(|| {
300 // Carry the consumed wire bytes inside the
301 // msg so the dispatcher can forward them to a
302 // backend without having to re-encode. The C
303 // engine keeps the request bytes in the
304 // inbound mbuf chain across recv -> filter
305 // -> forward; the Rust port stores them on
306 // the msg's own mbuf chain instead.
307 let pool = conn.mbuf_pool().clone();
308 let mut buf = pool.get();
309 buf.recv(&accumulated[..consumed]);
310 msg.mbufs_mut().push_back(buf);
311 msg.recompute_mlen();
312 accumulated.drain(0..consumed);
313 let _ = consumed_before;
314 conn.outstanding_mut().insert(msg.id(), msg.id());
315 // The placeholder enqueue cannot fail under
316 // normal operation; if it does we surface it
317 // via the outer `?`.
318 conn.enqueue_out(Msg::new(msg.id(), msg.ty(), true))
319 .map_err(|e: NetError| e)?;
320 let outcome = handler
321 .dispatcher
322 .dispatch(msg, handler.response_tx.clone());
323 let inline = match outcome {
324 DispatchOutcome::Pending | DispatchOutcome::Drop => None,
325 DispatchOutcome::Inline(rsp) | DispatchOutcome::Error(rsp) => {
326 Some(OutboundEnvelope {
327 req_id: rsp.id(),
328 rsp,
329 span: tracing::Span::current(),
330 source_peer_idx: None,
331 })
332 }
333 };
334 Ok::<Option<OutboundEnvelope>, NetError>(inline)
335 })?;
336 if let Some(env) = inline_send {
337 let _ = handler.response_tx.send(env).await;
338 }
339 if let Some(qid) = quit_msg_id {
340 // Real Redis replies `+OK\r\n` to QUIT and then
341 // closes the client connection. Synthesize the
342 // reply here (the dispatcher returned `Drop`
343 // because there is no key to route) and send it
344 // through the same `response_tx` used by every
345 // other reply, which pops the placeholder we
346 // pushed onto `omsg_q` above. Without this the
347 // outer client loop's exit condition
348 // (`omsg_q.is_empty()`) is never met and the
349 // connection deadlocks until the kernel times
350 // out the read.
351 let pool = conn.mbuf_pool().clone();
352 let mut anchor = Msg::new(qid, MsgType::ReqRedisQuit, true);
353 anchor.set_parent_id(qid);
354 let rsp = response::make_simple_redis(&anchor, &pool, b"+OK\r\n");
355 let env = OutboundEnvelope {
356 req_id: qid,
357 rsp,
358 span: req_span.clone(),
359 source_peer_idx: None,
360 };
361 let _ = handler.response_tx.send(env).await;
362 // Mirror the C engine: close after replying.
363 conn.set_eof();
364 return Ok(());
365 }
366 }
367 MsgParseResult::Again
368 | MsgParseResult::Repair
369 | MsgParseResult::Fragment
370 | MsgParseResult::Noop => {
371 let consumed = msg.parser_pos();
372 if consumed > 0 {
373 accumulated.drain(0..consumed);
374 } else {
375 return Ok(());
376 }
377 }
378 MsgParseResult::Error | MsgParseResult::OomError | MsgParseResult::DynoConfig => {
379 return Err(NetError::Parse(format!("{parse_result:?}")));
380 }
381 }
382 }
383 Ok(())
384}
385
386fn handle_response(conn: &mut Conn, env: &OutboundEnvelope, pending: &mut Vec<u8>) {
387 let _enter = env.span.enter();
388 let bytes_len: usize = env.rsp.mbufs().iter().map(|b| b.readable().len()).sum();
389 let _send_span =
390 tracing::info_span!("client.send", req_id = env.req_id, bytes = bytes_len,).entered();
391 for buf in env.rsp.mbufs() {
392 pending.extend_from_slice(buf.readable());
393 }
394 // Pop the matching outstanding entry.
395 conn.outstanding_mut().remove(&env.req_id);
396 // Pop the placeholder we pushed on enqueue.
397 if let Some(front) = conn.omsg_q_mut().front() {
398 if front.id() == env.req_id {
399 let _ = conn.omsg_q_mut().pop_front();
400 }
401 }
402}
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407
408 #[test]
409 fn alloc_msg_id_is_monotonic() {
410 let (tx, _rx) = mpsc::channel(1);
411 let mut h = ClientHandler::new(Arc::new(crate::net::NoopDispatcher), tx, DataStore::Redis);
412 let a = h.alloc_msg_id();
413 let b = h.alloc_msg_id();
414 assert_eq!(a + 1, b);
415 }
416}