Skip to main content

nodedb_cluster/transport/
server.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Inbound Raft RPC handling.
4//!
5//! Accepts connections from the QUIC endpoint, dispatches incoming bidi
6//! streams to a [`RaftRpcHandler`], and writes back the response frame.
7//!
8//! # Authenticated wire envelope
9//!
10//! Every on-wire message is an [`auth_envelope`]-wrapped
11//! [`rpc_codec`] frame. The envelope carries `from_node_id`, a per-peer
12//! monotonic `seq`, and an HMAC-SHA256 MAC. [`handle_stream`]:
13//!
14//! 1. reads one envelope from the QUIC stream,
15//! 2. verifies the MAC against the cluster MAC key held by
16//!    [`AuthContext`],
17//! 3. rejects replays via the per-peer sliding window,
18//!    3b. verifies the TLS peer certificate identity against the topology pin,
19//! 4. decodes the inner frame and dispatches to the handler,
20//! 5. wraps the handler's response in its own authenticated envelope
21//!    with `from_node_id = local_node_id` and a fresh outbound seq for
22//!    the caller's id.
23//!
24//! # Cooperative shutdown
25//!
26//! Every long-lived `.await` is wrapped in a `tokio::select!` over a
27//! `watch::Receiver<bool>` shutdown signal that is cloned into every
28//! spawned child task, so graceful shutdown promptly releases handler
29//! Arcs held by grandchild per-stream tasks.
30//!
31//! [`auth_envelope`]: crate::rpc_codec::auth_envelope
32//! [`rpc_codec`]: crate::rpc_codec
33
34use std::sync::Arc;
35
36use rustls::pki_types::CertificateDer;
37use tokio::sync::watch;
38use tracing::{debug, warn};
39
40use crate::error::{ClusterError, Result};
41use crate::rpc_codec::{self, MAX_RPC_PAYLOAD_SIZE, RaftRpc, auth_envelope};
42use crate::topology::NodeInfo;
43use crate::transport::auth_context::AuthContext;
44use crate::transport::peer_identity_verifier::{
45    IDENTITY_MISMATCH_QUIC_ERROR, VerifyOutcome, verify_peer_identity,
46};
47use crate::wire_version::handshake_io::{local_version_range, perform_version_handshake_server};
48
49/// Topology-decoupled lookup for per-node identity pins.
50///
51/// The server needs to check the TLS peer cert against the pinned
52/// `NodeInfo` for the MAC-verified `from_node_id`, but it must not
53/// take a direct dependency on `ClusterState` or `ClusterTopology`
54/// (which would create a circular crate dependency and would be hard
55/// to test).  Implementors wrap whatever topology store they have.
56///
57/// The `NoopIdentityStore` below is used in insecure-transport mode
58/// and in unit tests that do not exercise the identity layer.
59///
60/// The `find_by_spki` and `find_by_spiffe` methods are called by the
61/// TLS-layer [`PinnedClientVerifier`] and [`PinnedServerVerifier`]
62/// during the QUIC handshake (before `node_id` is known from the MAC
63/// envelope). They search the topology by the cert's identity rather
64/// than by node_id.
65///
66/// [`PinnedClientVerifier`]: crate::transport::config::PinnedClientVerifier
67/// [`PinnedServerVerifier`]: crate::transport::config::PinnedServerVerifier
68pub trait PeerIdentityStore: Send + Sync + 'static {
69    /// Return the `NodeInfo` for the given node_id, or `None` if
70    /// the node is not in the topology (treat as bootstrap window).
71    fn get_node_info(&self, node_id: u64) -> Option<NodeInfo>;
72
73    /// Return the `NodeInfo` for the node whose pinned SPKI fingerprint
74    /// matches `spki`, or `None` if no node in the topology has that pin.
75    ///
76    /// Used by the TLS-layer verifiers during the handshake (before the
77    /// MAC envelope reveals `node_id`).
78    fn find_by_spki(&self, spki: &[u8; 32]) -> Option<NodeInfo>;
79
80    /// Return the `NodeInfo` for the node whose pinned SPIFFE id matches
81    /// `spiffe_id`, or `None` if no node has that id.
82    ///
83    /// Used by the TLS-layer verifiers during the handshake.
84    fn find_by_spiffe(&self, spiffe_id: &str) -> Option<NodeInfo>;
85}
86
87/// Always returns `None`, accepting every peer as a bootstrap window.
88///
89/// Used when mTLS is disabled (insecure transport) or in unit tests
90/// that focus on HMAC / codec layers rather than identity binding.
91pub struct NoopIdentityStore;
92
93impl PeerIdentityStore for NoopIdentityStore {
94    fn get_node_info(&self, _node_id: u64) -> Option<NodeInfo> {
95        None
96    }
97
98    fn find_by_spki(&self, _spki: &[u8; 32]) -> Option<NodeInfo> {
99        None
100    }
101
102    fn find_by_spiffe(&self, _spiffe_id: &str) -> Option<NodeInfo> {
103        None
104    }
105}
106
107/// Trait for handling incoming Raft RPCs.
108///
109/// Implementors receive a request [`RaftRpc`] and return the corresponding
110/// response variant. The transport calls this for each incoming bidi stream.
111pub trait RaftRpcHandler: Send + Sync + 'static {
112    fn handle_rpc(&self, rpc: RaftRpc)
113    -> impl std::future::Future<Output = Result<RaftRpc>> + Send;
114}
115
116/// Extract the peer's leaf certificate DER bytes from a QUIC connection.
117///
118/// Returns `None` if the peer did not present a certificate (insecure
119/// transport) or if the runtime-type downcast fails.
120fn peer_leaf_cert_der(conn: &quinn::Connection) -> Option<Vec<u8>> {
121    let identity = conn.peer_identity()?;
122    let certs: &Vec<CertificateDer<'static>> = identity.downcast_ref()?;
123    certs.first().map(|c| c.as_ref().to_vec())
124}
125
126/// Handle all bidi streams on a single connection.
127///
128/// Exits cleanly (Ok) on shutdown, on normal connection close,
129/// or on unrecoverable transport error.
130pub(crate) async fn handle_connection<H: RaftRpcHandler, S: PeerIdentityStore>(
131    conn: quinn::Connection,
132    handler: Arc<H>,
133    auth: Arc<AuthContext>,
134    identity_store: Arc<S>,
135    mut shutdown: watch::Receiver<bool>,
136) -> Result<()> {
137    // Extract the peer cert once per connection; it does not change.
138    let peer_cert_der: Option<Vec<u8>> = peer_leaf_cert_der(&conn);
139    let peer_addr = conn.remote_address();
140
141    // Perform the wire-version handshake on the first bidi stream before
142    // dispatching any RPCs. The client opens a dedicated stream for this
143    // exchange; subsequent streams on the same connection are RPC streams.
144    let agreed_version = {
145        let accepted = tokio::select! {
146            biased;
147            _ = shutdown.changed() => {
148                if *shutdown.borrow() {
149                    return Ok(());
150                }
151                // Spurious change — retry the accept.
152                conn.accept_bi().await
153            }
154            result = conn.accept_bi() => result,
155        };
156
157        let (mut hs_send, mut hs_recv) = match accepted {
158            Ok(streams) => streams,
159            Err(quinn::ConnectionError::ApplicationClosed(_)) => return Ok(()),
160            Err(quinn::ConnectionError::LocallyClosed) => return Ok(()),
161            Err(e) => {
162                return Err(ClusterError::Transport {
163                    detail: format!("accept handshake stream from {peer_addr}: {e}"),
164                });
165            }
166        };
167
168        let local = local_version_range();
169        match perform_version_handshake_server(&conn, &mut hs_send, &mut hs_recv).await {
170            Ok(v) => v,
171            Err(e) => {
172                warn!(
173                    peer_addr = %peer_addr,
174                    local_min = %local.min,
175                    local_max = %local.max,
176                    error = %e,
177                    "wire version handshake failed; closing connection"
178                );
179                // perform_version_handshake_server already closed the QUIC
180                // connection on range mismatch; propagate the error so the
181                // caller logs it and the connection task exits.
182                return Err(e);
183            }
184        }
185    };
186
187    debug!(
188        peer_addr = %peer_addr,
189        agreed_version = %agreed_version,
190        "wire version handshake complete"
191    );
192
193    loop {
194        let accepted = tokio::select! {
195            biased;
196            _ = shutdown.changed() => {
197                if *shutdown.borrow() {
198                    return Ok(());
199                }
200                continue;
201            }
202            result = conn.accept_bi() => result,
203        };
204
205        let (send, recv) = match accepted {
206            Ok(streams) => streams,
207            Err(quinn::ConnectionError::ApplicationClosed(_)) => return Ok(()),
208            Err(quinn::ConnectionError::LocallyClosed) => return Ok(()),
209            Err(e) => {
210                return Err(ClusterError::Transport {
211                    detail: format!("accept_bi: {e}"),
212                });
213            }
214        };
215
216        let h = handler.clone();
217        let stream_shutdown = shutdown.clone();
218        let stream_auth = auth.clone();
219        let stream_id_store = identity_store.clone();
220        let stream_cert = peer_cert_der.clone();
221        let conn_clone = conn.clone();
222        tokio::spawn(async move {
223            if let Err(e) = handle_stream(
224                h,
225                stream_auth,
226                stream_id_store,
227                stream_cert,
228                conn_clone,
229                send,
230                recv,
231                stream_shutdown,
232            )
233            .await
234            {
235                debug!(error = %e, "raft RPC stream error");
236            }
237        });
238    }
239}
240
241/// Handle a single bidi stream: read request → dispatch → write response.
242///
243/// Every long-lived await is racing a shutdown signal — see the
244/// module docstring for the rationale.
245#[allow(clippy::too_many_arguments)]
246async fn handle_stream<H: RaftRpcHandler, S: PeerIdentityStore>(
247    handler: Arc<H>,
248    auth: Arc<AuthContext>,
249    identity_store: Arc<S>,
250    peer_cert_der: Option<Vec<u8>>,
251    conn: quinn::Connection,
252    mut send: quinn::SendStream,
253    mut recv: quinn::RecvStream,
254    mut shutdown: watch::Receiver<bool>,
255) -> Result<()> {
256    let work = async {
257        // 1. Read one envelope.
258        let envelope = read_envelope(&mut recv).await?;
259        let (fields, inner_frame) = auth_envelope::parse_envelope(&envelope, &auth.mac_key)?;
260
261        // 2. Replay window — under the advertised from_node_id (MAC-verified).
262        //    Self-addressed frames skip the window: when a node dispatches
263        //    an RPC to itself over the transport, the shared `AuthContext`
264        //    means one window is updated by both the server-side request
265        //    accept (here) and the client-side response accept (in
266        //    `send.rs::parse_inbound`). Skipping when `from == local`
267        //    keeps the two flows from tripping on each other's entries —
268        //    a self-addressed frame can't have been replayed by an
269        //    external attacker by definition.
270        if fields.from_node_id != auth.local_node_id {
271            auth.peer_seq_in.accept(fields.from_node_id, fields.seq)?;
272        }
273
274        // 3b. Peer identity check — binds the MAC-verified node_id to the
275        //     TLS certificate.  Self-addressed frames skip the check by the
276        //     same reasoning as the replay window above.
277        if fields.from_node_id != auth.local_node_id
278            && let Some(cert_der) = &peer_cert_der
279        {
280            let node_info = identity_store.get_node_info(fields.from_node_id);
281            match node_info {
282                Some(ref info) => match verify_peer_identity(info, cert_der) {
283                    VerifyOutcome::Accepted { method } => {
284                        debug!(
285                            node_id = fields.from_node_id,
286                            ?method,
287                            "peer identity verified"
288                        );
289                    }
290                    VerifyOutcome::BootstrapAccepted => {
291                        warn!(
292                            node_id = fields.from_node_id,
293                            "peer identity not pinned — bootstrap window accepted"
294                        );
295                    }
296                    VerifyOutcome::Rejected => {
297                        warn!(
298                            node_id = fields.from_node_id,
299                            "peer identity mismatch — closing connection"
300                        );
301                        conn.close(IDENTITY_MISMATCH_QUIC_ERROR, b"peer identity mismatch");
302                        return Err(ClusterError::Transport {
303                            detail: format!(
304                                "peer identity mismatch for node {}",
305                                fields.from_node_id
306                            ),
307                        });
308                    }
309                },
310                None => {
311                    // Node not yet in topology — bootstrap window.
312                    warn!(
313                        node_id = fields.from_node_id,
314                        "node not in topology — bootstrap window accepted"
315                    );
316                }
317            }
318        }
319
320        // 4. Decode inner RPC and hand to handler.
321        let request = rpc_codec::decode(inner_frame)?;
322        let response = handler.handle_rpc(request).await?;
323
324        // 5. Wrap the response in its own envelope. `from = local_node_id`,
325        //    `seq = next outbound seq scoped to the caller`.
326        let response_inner = rpc_codec::encode(&response)?;
327        let response_seq = auth.peer_seq_out.next();
328        let mut response_envelope =
329            Vec::with_capacity(auth_envelope::ENVELOPE_OVERHEAD + response_inner.len());
330        auth_envelope::write_envelope(
331            auth.local_node_id,
332            response_seq,
333            &response_inner,
334            &auth.mac_key,
335            &mut response_envelope,
336        )?;
337
338        send.write_all(&response_envelope)
339            .await
340            .map_err(|e| ClusterError::Transport {
341                detail: format!("write response: {e}"),
342            })?;
343        send.finish().map_err(|e| ClusterError::Transport {
344            detail: format!("finish response: {e}"),
345        })?;
346        Ok::<(), ClusterError>(())
347    };
348
349    tokio::select! {
350        biased;
351        _ = shutdown.changed() => Ok(()),
352        result = work => result,
353    }
354}
355
356/// Read a complete authenticated envelope from a QUIC receive stream.
357///
358/// Reads the fixed envelope pre-header (version + from_node_id + seq +
359/// inner_len), then the inner frame, then the MAC tag. Returns the full
360/// envelope bytes for caller-side parsing.
361pub(crate) async fn read_envelope(recv: &mut quinn::RecvStream) -> Result<Vec<u8>> {
362    // Envelope header is version(1) + from_node_id(8) + seq(8) + inner_len(4).
363    const ENV_HDR_LEN: usize = 21;
364
365    let mut hdr = [0u8; ENV_HDR_LEN];
366    recv.read_exact(&mut hdr)
367        .await
368        .map_err(|e| ClusterError::Transport {
369            detail: format!("read envelope header: {e}"),
370        })?;
371
372    let inner_len = u32::from_le_bytes([hdr[17], hdr[18], hdr[19], hdr[20]]);
373    if inner_len > MAX_RPC_PAYLOAD_SIZE {
374        return Err(ClusterError::Codec {
375            detail: format!(
376                "envelope inner length {inner_len} exceeds maximum {MAX_RPC_PAYLOAD_SIZE}"
377            ),
378        });
379    }
380
381    let total = ENV_HDR_LEN + inner_len as usize + rpc_codec::MAC_LEN;
382    let mut buf = vec![0u8; total];
383    buf[..ENV_HDR_LEN].copy_from_slice(&hdr);
384    if total > ENV_HDR_LEN {
385        recv.read_exact(&mut buf[ENV_HDR_LEN..])
386            .await
387            .map_err(|e| ClusterError::Transport {
388                detail: format!("read envelope payload+mac: {e}"),
389            })?;
390    }
391
392    Ok(buf)
393}