nodedb_cluster/transport/server.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! Inbound Raft RPC handling.
4//!
5//! Accepts connections from the QUIC endpoint, dispatches incoming bidi
6//! streams to a [`RaftRpcHandler`], and writes back the response frame.
7//!
8//! # Authenticated wire envelope
9//!
10//! Every on-wire message is an [`auth_envelope`]-wrapped
11//! [`rpc_codec`] frame. The envelope carries `from_node_id`, a per-peer
12//! monotonic `seq`, and an HMAC-SHA256 MAC. [`handle_stream`]:
13//!
14//! 1. reads one envelope from the QUIC stream,
15//! 2. verifies the MAC against the cluster MAC key held by
16//! [`AuthContext`],
17//! 3. rejects replays via the per-peer sliding window,
18//! 3b. verifies the TLS peer certificate identity against the topology pin,
19//! 4. decodes the inner frame and dispatches to the handler,
20//! 5. wraps the handler's response in its own authenticated envelope
21//! with `from_node_id = local_node_id` and a fresh outbound seq for
22//! the caller's id.
23//!
24//! # Cooperative shutdown
25//!
26//! Every long-lived `.await` is wrapped in a `tokio::select!` over a
27//! `watch::Receiver<bool>` shutdown signal that is cloned into every
28//! spawned child task, so graceful shutdown promptly releases handler
29//! Arcs held by grandchild per-stream tasks.
30//!
31//! [`auth_envelope`]: crate::rpc_codec::auth_envelope
32//! [`rpc_codec`]: crate::rpc_codec
33
34use std::sync::Arc;
35
36use rustls::pki_types::CertificateDer;
37use tokio::sync::watch;
38use tracing::{debug, warn};
39
40use crate::error::{ClusterError, Result};
41use crate::rpc_codec::{self, MAX_RPC_PAYLOAD_SIZE, RaftRpc, auth_envelope};
42use crate::topology::NodeInfo;
43use crate::transport::auth_context::AuthContext;
44use crate::transport::peer_identity_verifier::{
45 IDENTITY_MISMATCH_QUIC_ERROR, VerifyOutcome, verify_peer_identity,
46};
47use crate::wire_version::handshake_io::{local_version_range, perform_version_handshake_server};
48
49/// Topology-decoupled lookup for per-node identity pins.
50///
51/// The server needs to check the TLS peer cert against the pinned
52/// `NodeInfo` for the MAC-verified `from_node_id`, but it must not
53/// take a direct dependency on `ClusterState` or `ClusterTopology`
54/// (which would create a circular crate dependency and would be hard
55/// to test). Implementors wrap whatever topology store they have.
56///
57/// The `NoopIdentityStore` below is used in insecure-transport mode
58/// and in unit tests that do not exercise the identity layer.
59///
60/// The `find_by_spki` and `find_by_spiffe` methods are called by the
61/// TLS-layer [`PinnedClientVerifier`] and [`PinnedServerVerifier`]
62/// during the QUIC handshake (before `node_id` is known from the MAC
63/// envelope). They search the topology by the cert's identity rather
64/// than by node_id.
65///
66/// [`PinnedClientVerifier`]: crate::transport::config::PinnedClientVerifier
67/// [`PinnedServerVerifier`]: crate::transport::config::PinnedServerVerifier
68pub trait PeerIdentityStore: Send + Sync + 'static {
69 /// Return the `NodeInfo` for the given node_id, or `None` if
70 /// the node is not in the topology (treat as bootstrap window).
71 fn get_node_info(&self, node_id: u64) -> Option<NodeInfo>;
72
73 /// Return the `NodeInfo` for the node whose pinned SPKI fingerprint
74 /// matches `spki`, or `None` if no node in the topology has that pin.
75 ///
76 /// Used by the TLS-layer verifiers during the handshake (before the
77 /// MAC envelope reveals `node_id`).
78 fn find_by_spki(&self, spki: &[u8; 32]) -> Option<NodeInfo>;
79
80 /// Return the `NodeInfo` for the node whose pinned SPIFFE id matches
81 /// `spiffe_id`, or `None` if no node has that id.
82 ///
83 /// Used by the TLS-layer verifiers during the handshake.
84 fn find_by_spiffe(&self, spiffe_id: &str) -> Option<NodeInfo>;
85}
86
87/// Always returns `None`, accepting every peer as a bootstrap window.
88///
89/// Used when mTLS is disabled (insecure transport) or in unit tests
90/// that focus on HMAC / codec layers rather than identity binding.
91pub struct NoopIdentityStore;
92
93impl PeerIdentityStore for NoopIdentityStore {
94 fn get_node_info(&self, _node_id: u64) -> Option<NodeInfo> {
95 None
96 }
97
98 fn find_by_spki(&self, _spki: &[u8; 32]) -> Option<NodeInfo> {
99 None
100 }
101
102 fn find_by_spiffe(&self, _spiffe_id: &str) -> Option<NodeInfo> {
103 None
104 }
105}
106
107/// Trait for handling incoming Raft RPCs.
108///
109/// Implementors receive a request [`RaftRpc`] and return the corresponding
110/// response variant. The transport calls this for each incoming bidi stream.
111pub trait RaftRpcHandler: Send + Sync + 'static {
112 fn handle_rpc(&self, rpc: RaftRpc)
113 -> impl std::future::Future<Output = Result<RaftRpc>> + Send;
114}
115
116/// Extract the peer's leaf certificate DER bytes from a QUIC connection.
117///
118/// Returns `None` if the peer did not present a certificate (insecure
119/// transport) or if the runtime-type downcast fails.
120fn peer_leaf_cert_der(conn: &quinn::Connection) -> Option<Vec<u8>> {
121 let identity = conn.peer_identity()?;
122 let certs: &Vec<CertificateDer<'static>> = identity.downcast_ref()?;
123 certs.first().map(|c| c.as_ref().to_vec())
124}
125
126/// Handle all bidi streams on a single connection.
127///
128/// Exits cleanly (Ok) on shutdown, on normal connection close,
129/// or on unrecoverable transport error.
130pub(crate) async fn handle_connection<H: RaftRpcHandler, S: PeerIdentityStore>(
131 conn: quinn::Connection,
132 handler: Arc<H>,
133 auth: Arc<AuthContext>,
134 identity_store: Arc<S>,
135 mut shutdown: watch::Receiver<bool>,
136) -> Result<()> {
137 // Extract the peer cert once per connection; it does not change.
138 let peer_cert_der: Option<Vec<u8>> = peer_leaf_cert_der(&conn);
139 let peer_addr = conn.remote_address();
140
141 // Perform the wire-version handshake on the first bidi stream before
142 // dispatching any RPCs. The client opens a dedicated stream for this
143 // exchange; subsequent streams on the same connection are RPC streams.
144 let agreed_version = {
145 let accepted = tokio::select! {
146 biased;
147 _ = shutdown.changed() => {
148 if *shutdown.borrow() {
149 return Ok(());
150 }
151 // Spurious change — retry the accept.
152 conn.accept_bi().await
153 }
154 result = conn.accept_bi() => result,
155 };
156
157 let (mut hs_send, mut hs_recv) = match accepted {
158 Ok(streams) => streams,
159 Err(quinn::ConnectionError::ApplicationClosed(_)) => return Ok(()),
160 Err(quinn::ConnectionError::LocallyClosed) => return Ok(()),
161 Err(e) => {
162 return Err(ClusterError::Transport {
163 detail: format!("accept handshake stream from {peer_addr}: {e}"),
164 });
165 }
166 };
167
168 let local = local_version_range();
169 match perform_version_handshake_server(&conn, &mut hs_send, &mut hs_recv).await {
170 Ok(v) => v,
171 Err(e) => {
172 warn!(
173 peer_addr = %peer_addr,
174 local_min = %local.min,
175 local_max = %local.max,
176 error = %e,
177 "wire version handshake failed; closing connection"
178 );
179 // perform_version_handshake_server already closed the QUIC
180 // connection on range mismatch; propagate the error so the
181 // caller logs it and the connection task exits.
182 return Err(e);
183 }
184 }
185 };
186
187 debug!(
188 peer_addr = %peer_addr,
189 agreed_version = %agreed_version,
190 "wire version handshake complete"
191 );
192
193 loop {
194 let accepted = tokio::select! {
195 biased;
196 _ = shutdown.changed() => {
197 if *shutdown.borrow() {
198 return Ok(());
199 }
200 continue;
201 }
202 result = conn.accept_bi() => result,
203 };
204
205 let (send, recv) = match accepted {
206 Ok(streams) => streams,
207 Err(quinn::ConnectionError::ApplicationClosed(_)) => return Ok(()),
208 Err(quinn::ConnectionError::LocallyClosed) => return Ok(()),
209 Err(e) => {
210 return Err(ClusterError::Transport {
211 detail: format!("accept_bi: {e}"),
212 });
213 }
214 };
215
216 let h = handler.clone();
217 let stream_shutdown = shutdown.clone();
218 let stream_auth = auth.clone();
219 let stream_id_store = identity_store.clone();
220 let stream_cert = peer_cert_der.clone();
221 let conn_clone = conn.clone();
222 tokio::spawn(async move {
223 if let Err(e) = handle_stream(
224 h,
225 stream_auth,
226 stream_id_store,
227 stream_cert,
228 conn_clone,
229 send,
230 recv,
231 stream_shutdown,
232 )
233 .await
234 {
235 debug!(error = %e, "raft RPC stream error");
236 }
237 });
238 }
239}
240
241/// Handle a single bidi stream: read request → dispatch → write response.
242///
243/// Every long-lived await is racing a shutdown signal — see the
244/// module docstring for the rationale.
245#[allow(clippy::too_many_arguments)]
246async fn handle_stream<H: RaftRpcHandler, S: PeerIdentityStore>(
247 handler: Arc<H>,
248 auth: Arc<AuthContext>,
249 identity_store: Arc<S>,
250 peer_cert_der: Option<Vec<u8>>,
251 conn: quinn::Connection,
252 mut send: quinn::SendStream,
253 mut recv: quinn::RecvStream,
254 mut shutdown: watch::Receiver<bool>,
255) -> Result<()> {
256 let work = async {
257 // 1. Read one envelope.
258 let envelope = read_envelope(&mut recv).await?;
259 let (fields, inner_frame) = auth_envelope::parse_envelope(&envelope, &auth.mac_key)?;
260
261 // 2. Replay window — under the advertised from_node_id (MAC-verified).
262 // Self-addressed frames skip the window: when a node dispatches
263 // an RPC to itself over the transport, the shared `AuthContext`
264 // means one window is updated by both the server-side request
265 // accept (here) and the client-side response accept (in
266 // `send.rs::parse_inbound`). Skipping when `from == local`
267 // keeps the two flows from tripping on each other's entries —
268 // a self-addressed frame can't have been replayed by an
269 // external attacker by definition.
270 if fields.from_node_id != auth.local_node_id {
271 auth.peer_seq_in.accept(fields.from_node_id, fields.seq)?;
272 }
273
274 // 3b. Peer identity check — binds the MAC-verified node_id to the
275 // TLS certificate. Self-addressed frames skip the check by the
276 // same reasoning as the replay window above.
277 if fields.from_node_id != auth.local_node_id
278 && let Some(cert_der) = &peer_cert_der
279 {
280 let node_info = identity_store.get_node_info(fields.from_node_id);
281 match node_info {
282 Some(ref info) => match verify_peer_identity(info, cert_der) {
283 VerifyOutcome::Accepted { method } => {
284 debug!(
285 node_id = fields.from_node_id,
286 ?method,
287 "peer identity verified"
288 );
289 }
290 VerifyOutcome::BootstrapAccepted => {
291 warn!(
292 node_id = fields.from_node_id,
293 "peer identity not pinned — bootstrap window accepted"
294 );
295 }
296 VerifyOutcome::Rejected => {
297 warn!(
298 node_id = fields.from_node_id,
299 "peer identity mismatch — closing connection"
300 );
301 conn.close(IDENTITY_MISMATCH_QUIC_ERROR, b"peer identity mismatch");
302 return Err(ClusterError::Transport {
303 detail: format!(
304 "peer identity mismatch for node {}",
305 fields.from_node_id
306 ),
307 });
308 }
309 },
310 None => {
311 // Node not yet in topology — bootstrap window.
312 warn!(
313 node_id = fields.from_node_id,
314 "node not in topology — bootstrap window accepted"
315 );
316 }
317 }
318 }
319
320 // 4. Decode inner RPC and hand to handler.
321 let request = rpc_codec::decode(inner_frame)?;
322 let response = handler.handle_rpc(request).await?;
323
324 // 5. Wrap the response in its own envelope. `from = local_node_id`,
325 // `seq = next outbound seq scoped to the caller`.
326 let response_inner = rpc_codec::encode(&response)?;
327 let response_seq = auth.peer_seq_out.next();
328 let mut response_envelope =
329 Vec::with_capacity(auth_envelope::ENVELOPE_OVERHEAD + response_inner.len());
330 auth_envelope::write_envelope(
331 auth.local_node_id,
332 response_seq,
333 &response_inner,
334 &auth.mac_key,
335 &mut response_envelope,
336 )?;
337
338 send.write_all(&response_envelope)
339 .await
340 .map_err(|e| ClusterError::Transport {
341 detail: format!("write response: {e}"),
342 })?;
343 send.finish().map_err(|e| ClusterError::Transport {
344 detail: format!("finish response: {e}"),
345 })?;
346 Ok::<(), ClusterError>(())
347 };
348
349 tokio::select! {
350 biased;
351 _ = shutdown.changed() => Ok(()),
352 result = work => result,
353 }
354}
355
356/// Read a complete authenticated envelope from a QUIC receive stream.
357///
358/// Reads the fixed envelope pre-header (version + from_node_id + seq +
359/// inner_len), then the inner frame, then the MAC tag. Returns the full
360/// envelope bytes for caller-side parsing.
361pub(crate) async fn read_envelope(recv: &mut quinn::RecvStream) -> Result<Vec<u8>> {
362 // Envelope header is version(1) + from_node_id(8) + seq(8) + inner_len(4).
363 const ENV_HDR_LEN: usize = 21;
364
365 let mut hdr = [0u8; ENV_HDR_LEN];
366 recv.read_exact(&mut hdr)
367 .await
368 .map_err(|e| ClusterError::Transport {
369 detail: format!("read envelope header: {e}"),
370 })?;
371
372 let inner_len = u32::from_le_bytes([hdr[17], hdr[18], hdr[19], hdr[20]]);
373 if inner_len > MAX_RPC_PAYLOAD_SIZE {
374 return Err(ClusterError::Codec {
375 detail: format!(
376 "envelope inner length {inner_len} exceeds maximum {MAX_RPC_PAYLOAD_SIZE}"
377 ),
378 });
379 }
380
381 let total = ENV_HDR_LEN + inner_len as usize + rpc_codec::MAC_LEN;
382 let mut buf = vec![0u8; total];
383 buf[..ENV_HDR_LEN].copy_from_slice(&hdr);
384 if total > ENV_HDR_LEN {
385 recv.read_exact(&mut buf[ENV_HDR_LEN..])
386 .await
387 .map_err(|e| ClusterError::Transport {
388 detail: format!("read envelope payload+mac: {e}"),
389 })?;
390 }
391
392 Ok(buf)
393}