Skip to main content

dynomite/net/
dnode_client.rs

1//! Inbound peer-connection driver for the dnode peer plane.
2//!
3//! The local node is the receiver. The driver:
4//!
5//! 1. Reads bytes off the transport into a contiguous buffer.
6//! 2. Drives the dnode header parser ([`crate::proto::dnode::DnodeParser`])
7//!    over the buffer until a full `Dmsg` header has been observed.
8//! 3. If the header marks the payload as encrypted, decrypts it
9//!    using the per-connection AES key bound during the handshake
10//!    via [`crate::crypto::Crypto`]. When the header indicates a
11//!    plaintext payload (the peer-plane was negotiated unsecured),
12//!    the bytes pass through unchanged.
13//! 4. Drives the datastore parser over the (decrypted) payload to
14//!    reconstruct a [`Msg`].
15//! 5. Hands the parsed [`Msg`] to the supplied
16//!    [`ClientHandler`]'s dispatcher and routes the dispatcher's
17//!    response back through the per-connection responder channel.
18
19use tokio::io::{AsyncReadExt, AsyncWriteExt};
20use tokio::sync::mpsc;
21
22use crate::msg::Msg;
23use crate::msg::MsgParseResult;
24use crate::msg::MsgType;
25use crate::net::client::ClientHandler;
26use crate::net::conn::Conn;
27use crate::net::dispatcher::OutboundEnvelope;
28use crate::net::NetError;
29use crate::proto::dnode::{DmsgType, DnodeParser, ParseStep};
30
31/// Type alias for the dnode client handler bundle.
32pub type DnodeClientHandler = ClientHandler;
33
34/// Drive a DNODE_PEER_CLIENT FSM until the peer closes.
35///
36/// `rx` receives responses produced by the cluster dispatcher; the
37/// driver writes the response bytes back through the same
38/// transport.
39///
40/// # Errors
41/// Surfaces transport- and DNODE-level errors.
42pub async fn dnode_client_loop(
43    mut conn: Conn,
44    handler: ClientHandler,
45    mut rx: mpsc::Receiver<OutboundEnvelope>,
46) -> Result<(), NetError> {
47    let mut read_buf = vec![0u8; 4096];
48    let mut accumulated = Vec::<u8>::new();
49    let mut parser = DnodeParser::new();
50
51    loop {
52        if conn.is_eof() && conn.imsg_q().is_empty() && conn.omsg_q().is_empty() {
53            conn.set_done();
54            return Ok(());
55        }
56
57        tokio::select! {
58            res = async {
59                if let Some(t) = conn.transport_mut() {
60                    t.read(&mut read_buf).await
61                } else {
62                    Ok(0)
63                }
64            } => {
65                let n = res?;
66                if n == 0 {
67                    conn.set_eof();
68                    continue;
69                }
70                conn.record_recv(n);
71                accumulated.extend_from_slice(&read_buf[..n]);
72                drive_dnode_parser(&mut conn, &handler, &mut accumulated, &mut parser).await?;
73            }
74            Some(env) = rx.recv() => {
75                // Forward dispatcher-produced responses back to
76                // the peer over this same transport. The peer-
77                // originator's `DnodeServerConn` parses incoming
78                // bytes with `DnodeParser`, so the response must
79                // be dnode-framed (header + payload). Without
80                // this header the originator's parser hangs in
81                // `NeedMore`, the dispatcher's `responder` mpsc
82                // never gets the reply, and the originating
83                // client times out.
84                let bytes: Vec<u8> = env
85                    .rsp
86                    .mbufs()
87                    .iter()
88                    .flat_map(|b| b.readable().to_vec())
89                    .collect();
90                if !bytes.is_empty() {
91                    let mut header_buf = conn.mbuf_pool().get();
92                    crate::proto::dnode::dmsg_write(
93                        &mut header_buf,
94                        env.req_id,
95                        crate::proto::dnode::DmsgType::Res,
96                        0,
97                        true,
98                        None,
99                        u32::try_from(bytes.len()).unwrap_or(u32::MAX),
100                    )
101                    .map_err(|e| NetError::Dnode(format!("{e:?}")))?;
102                    let header_len = header_buf.readable().len();
103                    if let Some(t) = conn.transport_mut() {
104                        t.write_all(header_buf.readable()).await?;
105                        t.write_all(&bytes).await?;
106                        conn.record_send(header_len + bytes.len());
107                    }
108                }
109                conn.outstanding_mut().remove(&env.req_id);
110                if let Some(front) = conn.omsg_q_mut().front() {
111                    if front.id() == env.req_id {
112                        let _ = conn.omsg_q_mut().pop_front();
113                    }
114                }
115            }
116        }
117    }
118}
119
120async fn drive_dnode_parser(
121    conn: &mut Conn,
122    handler: &ClientHandler,
123    accumulated: &mut Vec<u8>,
124    parser: &mut DnodeParser,
125) -> Result<(), NetError> {
126    loop {
127        if accumulated.is_empty() {
128            return Ok(());
129        }
130        let step = parser.step(accumulated.as_slice());
131        match step {
132            ParseStep::NeedMore { .. } => return Ok(()),
133            ParseStep::Error { consumed } => {
134                return Err(NetError::Dnode(format!(
135                    "dnode header parse error after {consumed} bytes"
136                )));
137            }
138            ParseStep::HeaderDone { consumed } => {
139                let header_end = consumed;
140                let dmsg = parser.take_dmsg();
141                let plen = dmsg.plen as usize;
142                let total = header_end + plen;
143                if accumulated.len() < total {
144                    // Wait for more bytes for the payload; rewind
145                    // by stashing what we have. The parser was
146                    // moved to PostDone but we need it to retry
147                    // header parsing on the next chunk.
148                    parser.reset();
149                    return Ok(());
150                }
151                let payload = accumulated[header_end..total].to_vec();
152                accumulated.drain(0..total);
153                parser.reset();
154
155                // Gossip-class frames are control plane: feed the
156                // sender's identity into the gossip handler's
157                // failure detector and skip the datastore parse
158                // path. Without this fork the datastore parser
159                // sees an opaque ASCII pname (e.g. `127.0.0.1:8101`)
160                // and rejects it with a parse error, causing the
161                // dnode_client_loop to tear the connection down.
162                if is_gossip_ty(dmsg.ty) {
163                    handle_gossip_frame(handler, dmsg.ty, &payload);
164                    continue;
165                }
166
167                // Decrypt if the dnode header indicates the payload
168                // is encrypted and we have an AES key.
169                let decoded = if dmsg.is_encrypted() {
170                    let Some(key) = conn.aes_key() else {
171                        // No key has been negotiated yet; the
172                        // peer-plane handshake should have run
173                        // first. Surface a single opaque parse
174                        // error and let the driver close the
175                        // connection.
176                        return Err(NetError::Dnode(
177                            "dnode payload marked encrypted but no aes key bound".into(),
178                        ));
179                    };
180                    decrypt_dnode_payload(key, &payload)?
181                } else {
182                    payload
183                };
184
185                // Feed the decoded payload through the datastore
186                // parser to reconstruct a Msg.
187                let mut msg = Msg::new(dmsg.id, MsgType::Unknown, true);
188                let dmsg_ty = dmsg.ty;
189                msg.set_dmsg(dmsg);
190                let parse_result = match handler.data_store() {
191                    crate::conf::DataStore::Redis | crate::conf::DataStore::Noxu => {
192                        crate::proto::redis::redis_parse_req(&mut msg, &decoded)
193                    }
194                    crate::conf::DataStore::Memcache => {
195                        crate::proto::memcache::memcache_parse_req(&mut msg, &decoded)
196                    }
197                };
198                if matches!(dmsg_ty, DmsgType::ReqForward) {
199                    // A `ReqForward` is the wire signal that this
200                    // request was already routed by an upstream
201                    // dispatcher (e.g. a quorum coalescer issuing
202                    // a read-repair write back to a divergent
203                    // replica). The receiver must NOT re-fan it
204                    // out: tag the parsed request as
205                    // `LocalNodeOnly` so the cluster dispatcher
206                    // hands it straight to its local datastore.
207                    msg.set_routing(crate::msg::MsgRouting::LocalNodeOnly);
208                }
209                match parse_result {
210                    MsgParseResult::Ok | MsgParseResult::Noop => {
211                        let pool = conn.mbuf_pool().clone();
212                        let mut buf = pool.get();
213                        buf.recv(&decoded);
214                        msg.mbufs_mut().push_back(buf);
215                        msg.recompute_mlen();
216                        conn.outstanding_mut().insert(msg.id(), msg.id());
217                        conn.enqueue_out(Msg::new(msg.id(), msg.ty(), true))?;
218                        // Hand the parsed peer request to the
219                        // configured dispatcher. The dispatcher
220                        // either takes ownership and replies
221                        // asynchronously through `responder`, or it
222                        // returns an inline / error response that
223                        // we forward immediately, or it asks the
224                        // FSM to drop the request.
225                        let outcome = handler
226                            .dispatcher()
227                            .dispatch(msg, handler.response_tx().clone());
228                        match outcome {
229                            crate::net::dispatcher::DispatchOutcome::Pending
230                            | crate::net::dispatcher::DispatchOutcome::Drop => {}
231                            crate::net::dispatcher::DispatchOutcome::Inline(rsp)
232                            | crate::net::dispatcher::DispatchOutcome::Error(rsp) => {
233                                let env = OutboundEnvelope {
234                                    req_id: rsp.id(),
235                                    rsp,
236                                    span: tracing::Span::current(),
237                                    source_peer_idx: None,
238                                };
239                                let _ = handler.response_tx().send(env).await;
240                            }
241                        }
242                    }
243                    MsgParseResult::Again => return Ok(()),
244                    other => {
245                        return Err(NetError::Parse(format!("dnode payload parse: {other:?}")));
246                    }
247                }
248            }
249        }
250    }
251}
252
253/// Decrypt a dnode peer-plane payload using the per-connection AES
254/// key.
255///
256/// AES-128-CBC with PKCS#7 padding, IV from the trailing 16 bytes
257/// of the 32-byte key buffer. Returns a single opaque
258/// [`NetError::Dnode`] on failure regardless of whether the
259/// underlying error was bad padding, a length mismatch, or a
260/// key/iv mismatch, so peers cannot distinguish the cases (the
261/// padding-oracle surface flagged in the Stage 6 review).
262fn decrypt_dnode_payload(
263    key: &[u8; crate::crypto::AES_KEYLEN],
264    payload: &[u8],
265) -> Result<Vec<u8>, NetError> {
266    crate::crypto::Crypto::aes_decrypt(payload, key)
267        .map_err(|_| NetError::Dnode("dnode payload decrypt failed".into()))
268}
269
270/// True for any dnode message type that belongs to the gossip
271/// control plane. The data plane (`Req`, `ReqForward`, `Res`,
272/// `CryptoHandshake`, `Unknown`, `Debug`, `ParseError`) returns
273/// `false`.
274fn is_gossip_ty(ty: DmsgType) -> bool {
275    matches!(
276        ty,
277        DmsgType::GossipSyn
278            | DmsgType::GossipSynReply
279            | DmsgType::GossipAck
280            | DmsgType::GossipDigestSyn
281            | DmsgType::GossipDigestAck
282            | DmsgType::GossipDigestAck2
283            | DmsgType::GossipShutdown
284    )
285}
286
287/// Process a gossip control-plane frame. The payload is the
288/// sender peer's `host:port` (ASCII). Heartbeat-class frames
289/// feed the failure detector; `GossipShutdown` immediately
290/// transitions the sender to [`crate::cluster::peer::PeerState::Down`].
291///
292/// Frames received before the run loop has attached a gossip
293/// handler are silently dropped; this matches the reference
294/// engine's behaviour of ignoring stray gossip while the
295/// failure detector is still being constructed.
296fn handle_gossip_frame(handler: &ClientHandler, ty: DmsgType, payload: &[u8]) {
297    let Some(gossip) = handler.gossip() else {
298        return;
299    };
300    let Ok(pname) = std::str::from_utf8(payload) else {
301        return;
302    };
303    let pname = pname.trim();
304    if pname.is_empty() {
305        return;
306    }
307    let now = std::time::Instant::now();
308    match ty {
309        DmsgType::GossipShutdown => {
310            gossip.mark_down_pname(pname);
311        }
312        _ => {
313            gossip.record_heartbeat_pname(pname, now);
314        }
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321    use crate::io::reactor::{ConnRole, TcpTransport};
322    use tokio::net::{TcpListener, TcpStream};
323
324    #[tokio::test]
325    async fn build_and_drop() {
326        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
327        let addr = listener.local_addr().unwrap();
328        let _accept = tokio::spawn(async move {
329            let (s, _) = listener.accept().await.unwrap();
330            drop(s);
331        });
332        let s = TcpStream::connect(addr).await.unwrap();
333        let _conn = Conn::new(
334            Box::new(TcpTransport::new(s, ConnRole::DnodePeerClient)),
335            ConnRole::DnodePeerClient,
336        );
337    }
338}