nodedb-cluster 0.1.0

Distributed coordination layer for NodeDB — vShards, QUIC transport, and replication
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
// SPDX-License-Identifier: BUSL-1.1

//! QUIC and TLS configuration for Raft RPCs.

use std::sync::Arc;
use std::sync::Once;
use std::time::Duration;

use nodedb_types::config::tuning::ClusterTransportTuning;

use crate::error::{ClusterError, Result};
use crate::transport::pinned_verifier::{PinnedClientVerifier, PinnedServerVerifier};
use crate::transport::server::PeerIdentityStore;

/// Install rustls' `ring` CryptoProvider exactly once per process.
///
/// rustls 0.23 refuses to build any `ServerConfig` / `ClientConfig` until a
/// process-level provider is registered. In production the bootstrap
/// listener installs it early, but any caller that generates certs or
/// builds a TLS config before the bootstrap runs (e.g. integration tests
/// that exercise just the transport layer) hits a panic. Doing the
/// install here means every code path that ends up touching rustls
/// through this module is covered. `install_default` returns `Err` on
/// the second attempt, which we intentionally ignore via the `Once`
/// guard — it has already succeeded.
pub(crate) fn ensure_rustls_crypto_provider() {
    static ONCE: Once = Once::new();
    ONCE.call_once(|| {
        let _ = rustls::crypto::ring::default_provider().install_default();
    });
}

/// ALPN protocol identifier for NodeDB Raft RPCs.
pub const ALPN_NODEDB_RAFT: &[u8] = b"nodedb-raft/1";

/// SNI hostname used for QUIC connections between NodeDB nodes.
pub const SNI_HOSTNAME: &str = "nodedb";

/// Default RPC timeout.
///
/// Matches `ClusterTransportTuning::rpc_timeout_secs` default. Override by
/// constructing `NexarTransport::with_tuning()` with a custom `ClusterTransportTuning`.
pub const DEFAULT_RPC_TIMEOUT: Duration = Duration::from_secs(5);

/// Transport config tuned for Raft RPCs in a datacenter.
///
/// All values are read from `tuning`. Pass `&ClusterTransportTuning::default()`
/// to get the same behaviour as the previous hardcoded defaults.
pub fn raft_transport_config(tuning: &ClusterTransportTuning) -> quinn::TransportConfig {
    let mut config = quinn::TransportConfig::default();
    // Raft RPCs use bidi streams: one per request-response pair.
    config.max_concurrent_bidi_streams(quinn::VarInt::from_u32(tuning.quic_max_bi_streams));
    // Also allow uni streams for future migration/snapshot streaming.
    config.max_concurrent_uni_streams(quinn::VarInt::from_u32(tuning.quic_max_uni_streams));
    // Datacenter tuning: generous windows, low RTT estimate.
    config.receive_window(quinn::VarInt::from_u32(tuning.quic_receive_window));
    config.send_window(u64::from(tuning.quic_send_window));
    config.stream_receive_window(quinn::VarInt::from_u32(tuning.quic_stream_receive_window));
    config.initial_rtt(Duration::from_micros(100));
    config.keep_alive_interval(Some(Duration::from_secs(tuning.quic_keep_alive_secs)));
    config.max_idle_timeout(Some(
        Duration::from_secs(tuning.quic_idle_timeout_secs)
            .try_into()
            .expect("idle timeout fits IdleTimeout"),
    ));
    config
}

/// Build a QUIC server config with self-signed TLS (unauthenticated).
///
/// **Crate-private** — reachable only via
/// [`TransportCredentials::Insecure`](super::credentials::TransportCredentials::Insecure),
/// which logs a loud startup warning and bumps
/// [`insecure_transport_count`](super::credentials::insecure_transport_count).
/// Production clusters use the mTLS variant.
pub(crate) fn make_raft_server_config(
    tuning: &ClusterTransportTuning,
) -> Result<quinn::ServerConfig> {
    ensure_rustls_crypto_provider();
    let (cert, key) = nexar::transport::tls::generate_self_signed_cert().map_err(|e| {
        ClusterError::Transport {
            detail: format!("generate cert: {e}"),
        }
    })?;

    let provider = rustls::crypto::ring::default_provider();
    let mut tls_config = rustls::ServerConfig::builder_with_provider(Arc::new(provider))
        .with_safe_default_protocol_versions()
        .map_err(|e| ClusterError::Transport {
            detail: format!("server TLS protocol versions: {e}"),
        })?
        .with_no_client_auth()
        .with_single_cert(vec![cert], key)
        .map_err(|e| ClusterError::Transport {
            detail: format!("server TLS config: {e}"),
        })?;

    tls_config.alpn_protocols = vec![ALPN_NODEDB_RAFT.to_vec()];

    let quic_crypto = quinn::crypto::rustls::QuicServerConfig::try_from(Arc::new(tls_config))
        .map_err(|e| ClusterError::Transport {
            detail: format!("QUIC server config: {e}"),
        })?;

    let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_crypto));
    server_config.transport_config(Arc::new(raft_transport_config(tuning)));
    Ok(server_config)
}

/// Build a QUIC client config that skips server verification (unauthenticated).
///
/// **Crate-private** — reachable only via
/// [`TransportCredentials::Insecure`](super::credentials::TransportCredentials::Insecure),
/// which logs a loud startup warning and bumps
/// [`insecure_transport_count`](super::credentials::insecure_transport_count).
/// Production clusters use the mTLS variant.
pub(crate) fn make_raft_client_config(
    tuning: &ClusterTransportTuning,
) -> Result<quinn::ClientConfig> {
    ensure_rustls_crypto_provider();
    let provider = rustls::crypto::ring::default_provider();
    let mut tls_config = rustls::ClientConfig::builder_with_provider(Arc::new(provider))
        .with_safe_default_protocol_versions()
        .map_err(|e| ClusterError::Transport {
            detail: format!("client TLS protocol versions: {e}"),
        })?
        .dangerous()
        .with_custom_certificate_verifier(Arc::new(SkipServerVerification))
        .with_no_client_auth();

    tls_config.alpn_protocols = vec![ALPN_NODEDB_RAFT.to_vec()];

    let quic_crypto = quinn::crypto::rustls::QuicClientConfig::try_from(Arc::new(tls_config))
        .map_err(|e| ClusterError::Transport {
            detail: format!("QUIC client config: {e}"),
        })?;

    let mut client_config = quinn::ClientConfig::new(Arc::new(quic_crypto));
    client_config.transport_config(Arc::new(raft_transport_config(tuning)));
    Ok(client_config)
}

/// TLS credentials for a node (used for mTLS in production).
pub struct TlsCredentials {
    pub cert: rustls::pki_types::CertificateDer<'static>,
    pub key: rustls::pki_types::PrivateKeyDer<'static>,
    /// The primary CA that signed this node's own `cert`. Always the
    /// currently-active issuer for freshly-issued node certs.
    pub ca_cert: rustls::pki_types::CertificateDer<'static>,
    /// Additional CAs to trust as peer-cert issuers, beyond `ca_cert`.
    /// Populated during a rotation overlap window (L.4): the new CA is
    /// added, every node rebuilds its rustls config with *both* CAs in
    /// the root store, the operator reissues node certs signed by the
    /// new CA, and finally the old CA is removed. Rustls' `RootCertStore`
    /// treats every anchor as an acceptable issuer, so peers presenting
    /// certs signed by any of them handshake successfully.
    pub additional_ca_certs: Vec<rustls::pki_types::CertificateDer<'static>>,
    /// Optional CRL (Certificate Revocation List) in DER format.
    /// When present, revoked peer certificates are rejected during handshake.
    pub crls: Vec<rustls::pki_types::CertificateRevocationListDer<'static>>,
    /// Cluster-wide 32-byte symmetric secret used as the HMAC-SHA256 key for
    /// the authenticated frame envelope (see
    /// [`crate::rpc_codec::auth_envelope`]). Generated at bootstrap, persisted
    /// under `data_dir/tls/cluster_secret.bin`, distributed to joining nodes
    /// via the join RPC (L.4). Treat as key material — never log, always
    /// 0600 at rest.
    pub cluster_secret: [u8; 32],
    /// SHA-256 digest of this node's own SubjectPublicKeyInfo DER blob.
    /// Computed from `cert` at issuance time by [`issue_leaf_for_sans`].
    /// Stable across cert renewals that reuse the same key-pair; changes on
    /// key rotation.  Transmitted in `JoinRequest` so peers can pin us.
    pub spki_pin: [u8; 32],
}

/// Build a QUIC server config with mutual TLS (production mode).
///
/// Requires connecting clients to present a certificate signed by the cluster CA.
/// When `identity_store` contains topology entries, the `PinnedClientVerifier`
/// wraps the WebPki chain verifier and additionally checks the connecting
/// client's SPKI fingerprint or SPIFFE URI SAN against the topology pin.
/// Connections from nodes not yet in the topology are accepted in the bootstrap
/// window (defense-in-depth: `verify_peer_identity` at the app layer also fires).
pub fn make_raft_server_config_mtls(
    creds: &TlsCredentials,
    tuning: &ClusterTransportTuning,
    identity_store: Arc<dyn PeerIdentityStore>,
) -> Result<quinn::ServerConfig> {
    ensure_rustls_crypto_provider();
    let mut root_store = rustls::RootCertStore::empty();
    root_store
        .add(creds.ca_cert.clone())
        .map_err(|e| ClusterError::Transport {
            detail: format!("add CA to root store: {e}"),
        })?;
    // Overlap-window CAs (L.4). Every additional anchor is an
    // acceptable peer-cert issuer for the lifetime of this config;
    // rotation cutover happens by rebuilding the config with the
    // old CA dropped from `additional_ca_certs`.
    for extra in &creds.additional_ca_certs {
        root_store
            .add(extra.clone())
            .map_err(|e| ClusterError::Transport {
                detail: format!("add overlap CA to root store: {e}"),
            })?;
    }

    let mut verifier_builder = rustls::server::WebPkiClientVerifier::builder(Arc::new(root_store));

    // Add CRLs for certificate revocation checking.
    for crl in &creds.crls {
        verifier_builder = verifier_builder.with_crls(vec![crl.clone()]);
    }

    let inner_verifier = verifier_builder
        .build()
        .map_err(|e| ClusterError::Transport {
            detail: format!("build client verifier: {e}"),
        })?;

    let pinned_verifier = Arc::new(PinnedClientVerifier {
        inner: inner_verifier,
        identity_store,
    });

    let provider = rustls::crypto::ring::default_provider();
    let mut tls_config = rustls::ServerConfig::builder_with_provider(Arc::new(provider))
        .with_safe_default_protocol_versions()
        .map_err(|e| ClusterError::Transport {
            detail: format!("server TLS protocol versions: {e}"),
        })?
        .with_client_cert_verifier(pinned_verifier)
        .with_single_cert(vec![creds.cert.clone()], creds.key.clone_key())
        .map_err(|e| ClusterError::Transport {
            detail: format!("mTLS server config: {e}"),
        })?;

    tls_config.alpn_protocols = vec![ALPN_NODEDB_RAFT.to_vec()];

    let quic_crypto = quinn::crypto::rustls::QuicServerConfig::try_from(Arc::new(tls_config))
        .map_err(|e| ClusterError::Transport {
            detail: format!("QUIC mTLS server config: {e}"),
        })?;

    let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_crypto));
    server_config.transport_config(Arc::new(raft_transport_config(tuning)));
    Ok(server_config)
}

/// Build a QUIC client config with mutual TLS (production mode).
///
/// Verifies server cert and presents client cert, both signed by cluster CA.
/// When `identity_store` contains topology entries, the `PinnedServerVerifier`
/// wraps the standard WebPki verifier and additionally checks the server's SPKI
/// fingerprint or SPIFFE URI SAN against the pinned topology entry, if any.
/// Unknown server SPKI is accepted (bootstrap window).
pub fn make_raft_client_config_mtls(
    creds: &TlsCredentials,
    tuning: &ClusterTransportTuning,
    identity_store: Arc<dyn PeerIdentityStore>,
) -> Result<quinn::ClientConfig> {
    ensure_rustls_crypto_provider();
    let mut root_store = rustls::RootCertStore::empty();
    root_store
        .add(creds.ca_cert.clone())
        .map_err(|e| ClusterError::Transport {
            detail: format!("add CA to root store: {e}"),
        })?;
    // Overlap-window CAs (L.4). Every additional anchor is an
    // acceptable peer-cert issuer for the lifetime of this config;
    // rotation cutover happens by rebuilding the config with the
    // old CA dropped from `additional_ca_certs`.
    for extra in &creds.additional_ca_certs {
        root_store
            .add(extra.clone())
            .map_err(|e| ClusterError::Transport {
                detail: format!("add overlap CA to root store: {e}"),
            })?;
    }

    let provider = rustls::crypto::ring::default_provider();

    // Build the WebPki-backed inner verifier that validates chain, expiry, and revocation.
    let inner_webpki = rustls::client::WebPkiServerVerifier::builder_with_provider(
        Arc::new(root_store),
        Arc::new(provider),
    )
    .build()
    .map_err(|e| ClusterError::Transport {
        detail: format!("build server verifier: {e}"),
    })?;

    let pinned_verifier = Arc::new(PinnedServerVerifier {
        inner: inner_webpki,
        identity_store,
    });

    let provider2 = rustls::crypto::ring::default_provider();
    let mut tls_config = rustls::ClientConfig::builder_with_provider(Arc::new(provider2))
        .with_safe_default_protocol_versions()
        .map_err(|e| ClusterError::Transport {
            detail: format!("client TLS protocol versions: {e}"),
        })?
        .dangerous()
        .with_custom_certificate_verifier(pinned_verifier)
        .with_client_auth_cert(vec![creds.cert.clone()], creds.key.clone_key())
        .map_err(|e| ClusterError::Transport {
            detail: format!("mTLS client config: {e}"),
        })?;

    tls_config.alpn_protocols = vec![ALPN_NODEDB_RAFT.to_vec()];

    let quic_crypto = quinn::crypto::rustls::QuicClientConfig::try_from(Arc::new(tls_config))
        .map_err(|e| ClusterError::Transport {
            detail: format!("QUIC mTLS client config: {e}"),
        })?;

    let mut client_config = quinn::ClientConfig::new(Arc::new(quic_crypto));
    client_config.transport_config(Arc::new(raft_transport_config(tuning)));
    Ok(client_config)
}

/// Generate a cluster CA and issue a node certificate.
///
/// Called during bootstrap. The CA cert is stored in the catalog and
/// distributed to joining nodes via the JoinResponse.
pub fn generate_node_credentials(
    node_san: &str,
) -> Result<(nexar::transport::tls::ClusterCa, TlsCredentials)> {
    generate_node_credentials_multi_san(&[node_san, SNI_HOSTNAME])
}

/// Like [`generate_node_credentials`] but binds multiple SANs to the
/// issued leaf. The first SAN becomes the primary subject; the cluster
/// SNI `"nodedb"` is added automatically if not already present so
/// QUIC connects keep working against the fixed SNI.
pub fn generate_node_credentials_multi_san(
    sans: &[&str],
) -> Result<(nexar::transport::tls::ClusterCa, TlsCredentials)> {
    ensure_rustls_crypto_provider();
    let ca = nexar::transport::tls::ClusterCa::generate().map_err(|e| ClusterError::Transport {
        detail: format!("generate cluster CA: {e}"),
    })?;
    let creds = issue_leaf_for_sans(&ca, sans)?;
    Ok((ca, creds))
}

/// Issue a leaf cert under an already-loaded CA with the given SANs,
/// bundling a fresh `cluster_secret`. Used by operator tooling that
/// reissues a per-node cert without regenerating the cluster CA
/// (`nodedb regen-certs`).
///
/// A fresh `cluster_secret` is generated; callers that want to keep
/// the existing secret should overwrite the returned credential's
/// `cluster_secret` field with the loaded value before persisting.
///
/// The CA is borrowed and left untouched — no round-trip through
/// DER, no new owned `ClusterCa` synthesised. Callers that already
/// own the CA just keep it; callers that only had a borrow continue
/// to only have a borrow. If an owned CA handle is needed alongside
/// the creds, use `generate_node_credentials_multi_san` which returns
/// the freshly-generated CA directly.
pub fn issue_leaf_for_sans(
    ca: &nexar::transport::tls::ClusterCa,
    sans: &[&str],
) -> Result<TlsCredentials> {
    if sans.is_empty() {
        return Err(ClusterError::Transport {
            detail: "issue_leaf_for_sans requires at least one SAN".into(),
        });
    }
    let mut effective: Vec<&str> = sans.to_vec();
    if !effective.contains(&SNI_HOSTNAME) {
        effective.push(SNI_HOSTNAME);
    }
    let ca_cert = ca.cert_der();
    let (cert, key) = ca
        .issue_cert_multi(&effective)
        .map_err(|e| ClusterError::Transport {
            detail: format!("issue node cert: {e}"),
        })?;
    use rand::RngCore;
    let mut cluster_secret = [0u8; 32];
    rand::rng().fill_bytes(&mut cluster_secret);
    let spki_pin = crate::transport::peer_identity_verifier::spki_pin_from_cert_der(cert.as_ref())
        .unwrap_or([0u8; 32]);
    Ok(TlsCredentials {
        cert,
        key,
        ca_cert,
        additional_ca_certs: Vec::new(),
        crls: Vec::new(),
        cluster_secret,
        spki_pin,
    })
}

/// SHA-256 fingerprint (32 bytes) of a DER-encoded CA certificate.
/// Used as the stable identifier for CA trust add/remove operations.
pub fn ca_fingerprint(cert: &rustls::pki_types::CertificateDer<'_>) -> [u8; 32] {
    use sha2::{Digest, Sha256};
    let mut hasher = Sha256::new();
    hasher.update(cert.as_ref());
    hasher.finalize().into()
}

/// Format a CA fingerprint as a short lowercase hex string (8 bytes).
/// Used as the filename stem under `data_dir/tls/ca.d/<fp>.crt`.
pub fn ca_fingerprint_hex(fp: &[u8; 32]) -> String {
    let mut out = String::with_capacity(16);
    for b in &fp[..8] {
        use std::fmt::Write as _;
        let _ = write!(out, "{b:02x}");
    }
    out
}

/// Load CRLs from a PEM file.
///
/// Returns a list of CRL DER blobs parsed from the PEM-encoded file.
/// The file may contain multiple CRLs.
pub fn load_crls_from_pem(
    path: &std::path::Path,
) -> Result<Vec<rustls::pki_types::CertificateRevocationListDer<'static>>> {
    let pem_data = std::fs::read(path).map_err(|e| ClusterError::Transport {
        detail: format!("read CRL file {}: {e}", path.display()),
    })?;

    let mut reader = std::io::BufReader::new(&pem_data[..]);
    let crls: std::result::Result<Vec<_>, _> = rustls_pemfile::crls(&mut reader).collect();
    let crls = crls.map_err(|e| ClusterError::Transport {
        detail: format!("parse CRL from {}: {e}", path.display()),
    })?;

    Ok(crls)
}

/// Certificate verifier that accepts any server certificate (dev/bootstrap only).
#[derive(Debug)]
struct SkipServerVerification;

impl rustls::client::danger::ServerCertVerifier for SkipServerVerification {
    fn verify_server_cert(
        &self,
        _end_entity: &rustls::pki_types::CertificateDer<'_>,
        _intermediates: &[rustls::pki_types::CertificateDer<'_>],
        _server_name: &rustls::pki_types::ServerName<'_>,
        _ocsp_response: &[u8],
        _now: rustls::pki_types::UnixTime,
    ) -> std::result::Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
        Ok(rustls::client::danger::ServerCertVerified::assertion())
    }

    fn verify_tls12_signature(
        &self,
        _message: &[u8],
        _cert: &rustls::pki_types::CertificateDer<'_>,
        _dss: &rustls::DigitallySignedStruct,
    ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
    }

    fn verify_tls13_signature(
        &self,
        _message: &[u8],
        _cert: &rustls::pki_types::CertificateDer<'_>,
        _dss: &rustls::DigitallySignedStruct,
    ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
    }

    fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
        rustls::crypto::CryptoProvider::get_default()
            .map(|p| p.signature_verification_algorithms.supported_schemes())
            .unwrap_or_else(|| {
                rustls::crypto::ring::default_provider()
                    .signature_verification_algorithms
                    .supported_schemes()
            })
    }
}