dynomite/net/dnode_client.rs
1//! Inbound peer-connection driver for the dnode peer plane.
2//!
3//! The local node is the receiver. The driver:
4//!
5//! 1. Reads bytes off the transport into a contiguous buffer.
6//! 2. Drives the dnode header parser ([`crate::proto::dnode::DnodeParser`])
7//! over the buffer until a full `Dmsg` header has been observed.
8//! 3. If the header marks the payload as encrypted, decrypts it
9//! using the per-connection AES key bound during the handshake
10//! via [`crate::crypto::Crypto`]. When the header indicates a
11//! plaintext payload (the peer-plane was negotiated unsecured),
12//! the bytes pass through unchanged.
13//! 4. Drives the datastore parser over the (decrypted) payload to
14//! reconstruct a [`Msg`].
15//! 5. Hands the parsed [`Msg`] to the supplied
16//! [`ClientHandler`]'s dispatcher and routes the dispatcher's
17//! response back through the per-connection responder channel.
18
19use tokio::io::{AsyncReadExt, AsyncWriteExt};
20use tokio::sync::mpsc;
21
22use crate::msg::Msg;
23use crate::msg::MsgParseResult;
24use crate::msg::MsgType;
25use crate::net::client::ClientHandler;
26use crate::net::conn::Conn;
27use crate::net::dispatcher::OutboundEnvelope;
28use crate::net::NetError;
29use crate::proto::dnode::{DmsgType, DnodeParser, ParseStep};
30
31/// Type alias for the dnode client handler bundle.
32pub type DnodeClientHandler = ClientHandler;
33
34/// Drive a DNODE_PEER_CLIENT FSM until the peer closes.
35///
36/// `rx` receives responses produced by the cluster dispatcher; the
37/// driver writes the response bytes back through the same
38/// transport.
39///
40/// # Errors
41/// Surfaces transport- and DNODE-level errors.
42pub async fn dnode_client_loop(
43 mut conn: Conn,
44 handler: ClientHandler,
45 mut rx: mpsc::Receiver<OutboundEnvelope>,
46) -> Result<(), NetError> {
47 let mut read_buf = vec![0u8; 4096];
48 let mut accumulated = Vec::<u8>::new();
49 let mut parser = DnodeParser::new();
50
51 loop {
52 if conn.is_eof() && conn.imsg_q().is_empty() && conn.omsg_q().is_empty() {
53 conn.set_done();
54 return Ok(());
55 }
56
57 tokio::select! {
58 res = async {
59 if let Some(t) = conn.transport_mut() {
60 t.read(&mut read_buf).await
61 } else {
62 Ok(0)
63 }
64 } => {
65 let n = res?;
66 if n == 0 {
67 conn.set_eof();
68 continue;
69 }
70 conn.record_recv(n);
71 accumulated.extend_from_slice(&read_buf[..n]);
72 drive_dnode_parser(&mut conn, &handler, &mut accumulated, &mut parser).await?;
73 }
74 Some(env) = rx.recv() => {
75 // Forward dispatcher-produced responses back to
76 // the peer over this same transport. The peer-
77 // originator's `DnodeServerConn` parses incoming
78 // bytes with `DnodeParser`, so the response must
79 // be dnode-framed (header + payload). Without
80 // this header the originator's parser hangs in
81 // `NeedMore`, the dispatcher's `responder` mpsc
82 // never gets the reply, and the originating
83 // client times out.
84 let bytes: Vec<u8> = env
85 .rsp
86 .mbufs()
87 .iter()
88 .flat_map(|b| b.readable().to_vec())
89 .collect();
90 if !bytes.is_empty() {
91 let mut header_buf = conn.mbuf_pool().get();
92 crate::proto::dnode::dmsg_write(
93 &mut header_buf,
94 env.req_id,
95 crate::proto::dnode::DmsgType::Res,
96 0,
97 true,
98 None,
99 u32::try_from(bytes.len()).unwrap_or(u32::MAX),
100 )
101 .map_err(|e| NetError::Dnode(format!("{e:?}")))?;
102 let header_len = header_buf.readable().len();
103 if let Some(t) = conn.transport_mut() {
104 t.write_all(header_buf.readable()).await?;
105 t.write_all(&bytes).await?;
106 conn.record_send(header_len + bytes.len());
107 }
108 }
109 conn.outstanding_mut().remove(&env.req_id);
110 if let Some(front) = conn.omsg_q_mut().front() {
111 if front.id() == env.req_id {
112 let _ = conn.omsg_q_mut().pop_front();
113 }
114 }
115 }
116 }
117 }
118}
119
120async fn drive_dnode_parser(
121 conn: &mut Conn,
122 handler: &ClientHandler,
123 accumulated: &mut Vec<u8>,
124 parser: &mut DnodeParser,
125) -> Result<(), NetError> {
126 loop {
127 if accumulated.is_empty() {
128 return Ok(());
129 }
130 let step = parser.step(accumulated.as_slice());
131 match step {
132 ParseStep::NeedMore { .. } => return Ok(()),
133 ParseStep::Error { consumed } => {
134 return Err(NetError::Dnode(format!(
135 "dnode header parse error after {consumed} bytes"
136 )));
137 }
138 ParseStep::HeaderDone { consumed } => {
139 let header_end = consumed;
140 let dmsg = parser.take_dmsg();
141 let plen = dmsg.plen as usize;
142 let total = header_end + plen;
143 if accumulated.len() < total {
144 // Wait for more bytes for the payload; rewind
145 // by stashing what we have. The parser was
146 // moved to PostDone but we need it to retry
147 // header parsing on the next chunk.
148 parser.reset();
149 return Ok(());
150 }
151 let payload = accumulated[header_end..total].to_vec();
152 accumulated.drain(0..total);
153 parser.reset();
154
155 // Gossip-class frames are control plane: feed the
156 // sender's identity into the gossip handler's
157 // failure detector and skip the datastore parse
158 // path. Without this fork the datastore parser
159 // sees an opaque ASCII pname (e.g. `127.0.0.1:8101`)
160 // and rejects it with a parse error, causing the
161 // dnode_client_loop to tear the connection down.
162 if is_gossip_ty(dmsg.ty) {
163 handle_gossip_frame(handler, dmsg.ty, &payload);
164 continue;
165 }
166
167 // Decrypt if the dnode header indicates the payload
168 // is encrypted and we have an AES key.
169 let decoded = if dmsg.is_encrypted() {
170 let Some(key) = conn.aes_key() else {
171 // No key has been negotiated yet; the
172 // peer-plane handshake should have run
173 // first. Surface a single opaque parse
174 // error and let the driver close the
175 // connection.
176 return Err(NetError::Dnode(
177 "dnode payload marked encrypted but no aes key bound".into(),
178 ));
179 };
180 decrypt_dnode_payload(key, &payload)?
181 } else {
182 payload
183 };
184
185 // Feed the decoded payload through the datastore
186 // parser to reconstruct a Msg.
187 let mut msg = Msg::new(dmsg.id, MsgType::Unknown, true);
188 let dmsg_ty = dmsg.ty;
189 msg.set_dmsg(dmsg);
190 let parse_result = match handler.data_store() {
191 crate::conf::DataStore::Redis | crate::conf::DataStore::Noxu => {
192 crate::proto::redis::redis_parse_req(&mut msg, &decoded)
193 }
194 crate::conf::DataStore::Memcache => {
195 crate::proto::memcache::memcache_parse_req(&mut msg, &decoded)
196 }
197 };
198 if matches!(dmsg_ty, DmsgType::ReqForward) {
199 // A `ReqForward` is the wire signal that this
200 // request was already routed by an upstream
201 // dispatcher (e.g. a quorum coalescer issuing
202 // a read-repair write back to a divergent
203 // replica). The receiver must NOT re-fan it
204 // out: tag the parsed request as
205 // `LocalNodeOnly` so the cluster dispatcher
206 // hands it straight to its local datastore.
207 msg.set_routing(crate::msg::MsgRouting::LocalNodeOnly);
208 }
209 match parse_result {
210 MsgParseResult::Ok | MsgParseResult::Noop => {
211 let pool = conn.mbuf_pool().clone();
212 let mut buf = pool.get();
213 buf.recv(&decoded);
214 msg.mbufs_mut().push_back(buf);
215 msg.recompute_mlen();
216 conn.outstanding_mut().insert(msg.id(), msg.id());
217 conn.enqueue_out(Msg::new(msg.id(), msg.ty(), true))?;
218 // Hand the parsed peer request to the
219 // configured dispatcher. The dispatcher
220 // either takes ownership and replies
221 // asynchronously through `responder`, or it
222 // returns an inline / error response that
223 // we forward immediately, or it asks the
224 // FSM to drop the request.
225 let outcome = handler
226 .dispatcher()
227 .dispatch(msg, handler.response_tx().clone());
228 match outcome {
229 crate::net::dispatcher::DispatchOutcome::Pending
230 | crate::net::dispatcher::DispatchOutcome::Drop => {}
231 crate::net::dispatcher::DispatchOutcome::Inline(rsp)
232 | crate::net::dispatcher::DispatchOutcome::Error(rsp) => {
233 let env = OutboundEnvelope {
234 req_id: rsp.id(),
235 rsp,
236 span: tracing::Span::current(),
237 source_peer_idx: None,
238 };
239 let _ = handler.response_tx().send(env).await;
240 }
241 }
242 }
243 MsgParseResult::Again => return Ok(()),
244 other => {
245 return Err(NetError::Parse(format!("dnode payload parse: {other:?}")));
246 }
247 }
248 }
249 }
250 }
251}
252
253/// Decrypt a dnode peer-plane payload using the per-connection AES
254/// key.
255///
256/// AES-128-CBC with PKCS#7 padding, IV from the trailing 16 bytes
257/// of the 32-byte key buffer. Returns a single opaque
258/// [`NetError::Dnode`] on failure regardless of whether the
259/// underlying error was bad padding, a length mismatch, or a
260/// key/iv mismatch, so peers cannot distinguish the cases (the
261/// padding-oracle surface flagged in the Stage 6 review).
262fn decrypt_dnode_payload(
263 key: &[u8; crate::crypto::AES_KEYLEN],
264 payload: &[u8],
265) -> Result<Vec<u8>, NetError> {
266 crate::crypto::Crypto::aes_decrypt(payload, key)
267 .map_err(|_| NetError::Dnode("dnode payload decrypt failed".into()))
268}
269
270/// True for any dnode message type that belongs to the gossip
271/// control plane. The data plane (`Req`, `ReqForward`, `Res`,
272/// `CryptoHandshake`, `Unknown`, `Debug`, `ParseError`) returns
273/// `false`.
274fn is_gossip_ty(ty: DmsgType) -> bool {
275 matches!(
276 ty,
277 DmsgType::GossipSyn
278 | DmsgType::GossipSynReply
279 | DmsgType::GossipAck
280 | DmsgType::GossipDigestSyn
281 | DmsgType::GossipDigestAck
282 | DmsgType::GossipDigestAck2
283 | DmsgType::GossipShutdown
284 )
285}
286
287/// Process a gossip control-plane frame. The payload is the
288/// sender peer's `host:port` (ASCII). Heartbeat-class frames
289/// feed the failure detector; `GossipShutdown` immediately
290/// transitions the sender to [`crate::cluster::peer::PeerState::Down`].
291///
292/// Frames received before the run loop has attached a gossip
293/// handler are silently dropped; this matches the reference
294/// engine's behaviour of ignoring stray gossip while the
295/// failure detector is still being constructed.
296fn handle_gossip_frame(handler: &ClientHandler, ty: DmsgType, payload: &[u8]) {
297 let Some(gossip) = handler.gossip() else {
298 return;
299 };
300 let Ok(pname) = std::str::from_utf8(payload) else {
301 return;
302 };
303 let pname = pname.trim();
304 if pname.is_empty() {
305 return;
306 }
307 let now = std::time::Instant::now();
308 match ty {
309 DmsgType::GossipShutdown => {
310 gossip.mark_down_pname(pname);
311 }
312 _ => {
313 gossip.record_heartbeat_pname(pname, now);
314 }
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321 use crate::io::reactor::{ConnRole, TcpTransport};
322 use tokio::net::{TcpListener, TcpStream};
323
324 #[tokio::test]
325 async fn build_and_drop() {
326 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
327 let addr = listener.local_addr().unwrap();
328 let _accept = tokio::spawn(async move {
329 let (s, _) = listener.accept().await.unwrap();
330 drop(s);
331 });
332 let s = TcpStream::connect(addr).await.unwrap();
333 let _conn = Conn::new(
334 Box::new(TcpTransport::new(s, ConnRole::DnodePeerClient)),
335 ConnRole::DnodePeerClient,
336 );
337 }
338}