Skip to main content

rmcp_server_kit/
mtls_revocation.rs

1//! CDP-driven CRL revocation support for mTLS.
2//!
3//! When mTLS is configured with CRL checks enabled, startup performs a bounded
4//! bootstrap pass over the configured CA bundle, extracts CRL Distribution
5//! Point (CDP) URLs, fetches reachable CRLs, and builds the initial inner
6//! `rustls` verifier from that cache.
7//!
8//! During handshakes, the outer verifier remains stable for the lifetime of the
9//! TLS acceptor while its inner `WebPkiClientVerifier` is swapped atomically via
10//! `ArcSwap` as CRLs are discovered or refreshed. Discovery from connecting
11//! client certificates is fire-and-forget and never blocks the synchronous
12//! handshake path.
13//!
14//! Semantics:
15//! - `crl_deny_on_unavailable = false` => fail open with warn logs.
16//! - `crl_deny_on_unavailable = true` => fail closed when a certificate
17//!   advertises CDP URLs whose revocation status is not yet available.
18
19use std::{
20    collections::{HashMap, HashSet},
21    num::NonZeroU32,
22    pin::Pin,
23    sync::{Arc, Mutex},
24    time::{Duration, SystemTime, UNIX_EPOCH},
25};
26
27use arc_swap::ArcSwap;
28use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
29use rustls::{
30    DigitallySignedStruct, DistinguishedName, Error as TlsError, RootCertStore, SignatureScheme,
31    client::danger::HandshakeSignatureValid,
32    pki_types::{CertificateDer, CertificateRevocationListDer, UnixTime},
33    server::{
34        WebPkiClientVerifier,
35        danger::{ClientCertVerified, ClientCertVerifier},
36    },
37};
38use tokio::{
39    net::lookup_host,
40    sync::{RwLock, Semaphore, mpsc},
41    task::JoinSet,
42    time::{Instant, Sleep},
43};
44use tokio_util::sync::CancellationToken;
45use url::Url;
46use x509_parser::{
47    extensions::{DistributionPointName, GeneralName, ParsedExtension},
48    prelude::{FromDer, X509Certificate},
49    revocation_list::CertificateRevocationList,
50};
51
52use crate::{
53    auth::MtlsConfig,
54    error::McpxError,
55    ssrf::{check_scheme, ip_block_reason},
56};
57
58const BOOTSTRAP_TIMEOUT: Duration = Duration::from_secs(10);
59const MIN_AUTO_REFRESH: Duration = Duration::from_mins(10);
60const MAX_AUTO_REFRESH: Duration = Duration::from_hours(24);
61/// Connection timeout for CRL HTTP fetches. Independent of overall fetch
62/// timeout to bound time spent on unreachable hosts.
63const CRL_CONNECT_TIMEOUT: Duration = Duration::from_secs(3);
64
65/// Parsed CRL cached in memory and keyed by its source URL.
66#[derive(Clone, Debug)]
67#[non_exhaustive]
68pub struct CachedCrl {
69    /// DER bytes for the CRL.
70    pub der: CertificateRevocationListDer<'static>,
71    /// `thisUpdate` field from the CRL.
72    pub this_update: SystemTime,
73    /// `nextUpdate` field from the CRL, if present.
74    pub next_update: Option<SystemTime>,
75    /// Time the server fetched this CRL.
76    pub fetched_at: SystemTime,
77    /// Source URL used for retrieval.
78    pub source_url: String,
79}
80
81pub(crate) struct VerifierHandle(pub Arc<dyn ClientCertVerifier>);
82
83impl std::fmt::Debug for VerifierHandle {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        f.debug_struct("VerifierHandle").finish_non_exhaustive()
86    }
87}
88
89/// Shared CRL state backing the dynamic mTLS verifier.
90#[allow(
91    missing_debug_implementations,
92    reason = "contains ArcSwap and dyn verifier internals"
93)]
94#[non_exhaustive]
95pub struct CrlSet {
96    inner_verifier: ArcSwap<VerifierHandle>,
97    /// Cached CRLs keyed by URL.
98    pub cache: RwLock<HashMap<String, CachedCrl>>,
99    /// Immutable client-auth root store.
100    pub roots: Arc<RootCertStore>,
101    /// mTLS CRL configuration.
102    pub config: MtlsConfig,
103    /// Fire-and-forget discovery channel for newly-seen CDP URLs.
104    pub discover_tx: mpsc::UnboundedSender<String>,
105    client: reqwest::Client,
106    seen_urls: Mutex<HashSet<String>>,
107    cached_urls: Mutex<HashSet<String>>,
108    /// Global cap on simultaneous CRL HTTP fetches (SSRF amplification guard).
109    global_fetch_sem: Arc<Semaphore>,
110    /// Per-host serializer (one in-flight fetch per origin host).
111    host_semaphores: Arc<tokio::sync::Mutex<HashMap<String, Arc<Semaphore>>>>,
112    /// Global rate-limiter on discovery URL submissions; protects against
113    /// cert-driven URL flooding by a malicious mTLS peer.
114    ///
115    /// Note: this ships as a process-global limiter; per-source-IP scoping
116    /// is deferred to a future release because the rustls
117    /// `verify_client_cert` callback does not carry a `SocketAddr` for the
118    /// peer.
119    discovery_limiter: Arc<DefaultDirectRateLimiter>,
120    /// Cached cap on per-fetch response body size; copied from `config` so the
121    /// hot path doesn't re-read the (rarely changing) config struct.
122    max_response_bytes: u64,
123    last_cap_warn: Mutex<HashMap<&'static str, Instant>>,
124}
125
126impl CrlSet {
127    fn new(
128        roots: Arc<RootCertStore>,
129        config: MtlsConfig,
130        discover_tx: mpsc::UnboundedSender<String>,
131        initial_cache: HashMap<String, CachedCrl>,
132    ) -> Result<Arc<Self>, McpxError> {
133        let client = reqwest::Client::builder()
134            .timeout(config.crl_fetch_timeout)
135            .connect_timeout(CRL_CONNECT_TIMEOUT)
136            .tcp_keepalive(None)
137            .redirect(reqwest::redirect::Policy::none())
138            .user_agent(format!("rmcp-server-kit/{}", env!("CARGO_PKG_VERSION")))
139            .build()
140            .map_err(|error| McpxError::Startup(format!("CRL HTTP client init: {error}")))?;
141
142        let initial_verifier = rebuild_verifier(&roots, &config, &initial_cache)?;
143        let seen_urls = initial_cache.keys().cloned().collect::<HashSet<_>>();
144        let cached_urls = seen_urls.clone();
145
146        let concurrency = config.crl_max_concurrent_fetches.max(1);
147        let global_fetch_sem = Arc::new(Semaphore::new(concurrency));
148        let host_semaphores = Arc::new(tokio::sync::Mutex::new(HashMap::new()));
149
150        let rate =
151            NonZeroU32::new(config.crl_discovery_rate_per_min.max(1)).unwrap_or(NonZeroU32::MIN);
152        let discovery_limiter = Arc::new(RateLimiter::direct(Quota::per_minute(rate)));
153
154        let max_response_bytes = config.crl_max_response_bytes;
155
156        Ok(Arc::new(Self {
157            inner_verifier: ArcSwap::from_pointee(VerifierHandle(initial_verifier)),
158            cache: RwLock::new(initial_cache),
159            roots,
160            config,
161            discover_tx,
162            client,
163            seen_urls: Mutex::new(seen_urls),
164            cached_urls: Mutex::new(cached_urls),
165            global_fetch_sem,
166            host_semaphores,
167            discovery_limiter,
168            max_response_bytes,
169            last_cap_warn: Mutex::new(HashMap::new()),
170        }))
171    }
172
173    fn warn_cap_exceeded_throttled(&self, which: &'static str) {
174        let now = Instant::now();
175        let cooldown = Duration::from_mins(1);
176        let should_warn = match self.last_cap_warn.lock() {
177            Ok(mut guard) => {
178                let should_emit = guard
179                    .get(which)
180                    .is_none_or(|last| now.saturating_duration_since(*last) >= cooldown);
181                if should_emit {
182                    guard.insert(which, now);
183                }
184                should_emit
185            }
186            Err(poisoned) => {
187                let mut guard = poisoned.into_inner();
188                let should_emit = guard
189                    .get(which)
190                    .is_none_or(|last| now.saturating_duration_since(*last) >= cooldown);
191                if should_emit {
192                    guard.insert(which, now);
193                }
194                should_emit
195            }
196        };
197
198        if should_warn {
199            tracing::warn!(which = which, "CRL map cap exceeded; dropping newest entry");
200        }
201    }
202
203    async fn insert_cache_entry(&self, url: String, cached: CachedCrl) -> bool {
204        let inserted = {
205            let mut guard = self.cache.write().await;
206            if guard.len() >= self.config.crl_max_cache_entries && !guard.contains_key(&url) {
207                false
208            } else {
209                guard.insert(url.clone(), cached);
210                true
211            }
212        };
213
214        if inserted {
215            match self.cached_urls.lock() {
216                Ok(mut cached_urls) => {
217                    cached_urls.insert(url);
218                }
219                Err(poisoned) => {
220                    poisoned.into_inner().insert(url);
221                }
222            }
223        } else {
224            self.warn_cap_exceeded_throttled("cache");
225        }
226
227        inserted
228    }
229
230    /// Force an immediate refresh of all currently known CRL URLs.
231    ///
232    /// # Errors
233    ///
234    /// Returns an error if rebuilding the inner verifier fails.
235    pub async fn force_refresh(&self) -> Result<(), McpxError> {
236        let urls = {
237            let cache = self.cache.read().await;
238            cache.keys().cloned().collect::<Vec<_>>()
239        };
240        self.refresh_urls(urls).await
241    }
242
243    async fn refresh_due_urls(&self) -> Result<(), McpxError> {
244        let now = SystemTime::now();
245        let urls = {
246            let cache = self.cache.read().await;
247            cache
248                .iter()
249                .filter(|(_, cached)| {
250                    should_refresh_cached(cached, now, self.config.crl_refresh_interval)
251                })
252                .map(|(url, _)| url.clone())
253                .collect::<Vec<_>>()
254        };
255
256        if urls.is_empty() {
257            return Ok(());
258        }
259
260        self.refresh_urls(urls).await
261    }
262
263    async fn refresh_urls(&self, urls: Vec<String>) -> Result<(), McpxError> {
264        let results = self.fetch_url_results(urls).await;
265        let now = SystemTime::now();
266        let mut cache = self.cache.write().await;
267        let mut changed = false;
268
269        for (url, result) in results {
270            match result {
271                Ok(cached) => {
272                    if cache.len() >= self.config.crl_max_cache_entries && !cache.contains_key(&url)
273                    {
274                        drop(cache);
275                        self.warn_cap_exceeded_throttled("cache");
276                        cache = self.cache.write().await;
277                        continue;
278                    }
279                    cache.insert(url.clone(), cached);
280                    changed = true;
281                    match self.cached_urls.lock() {
282                        Ok(mut cached_urls) => {
283                            cached_urls.insert(url);
284                        }
285                        Err(poisoned) => {
286                            poisoned.into_inner().insert(url);
287                        }
288                    }
289                }
290                Err(error) => {
291                    let remove_entry = cache.get(&url).is_some_and(|existing| {
292                        existing
293                            .next_update
294                            .and_then(|next| next.checked_add(self.config.crl_stale_grace))
295                            .is_some_and(|deadline| now > deadline)
296                    });
297                    tracing::warn!(url = %url, error = %error, "CRL refresh failed");
298                    if remove_entry {
299                        cache.remove(&url);
300                        changed = true;
301                        match self.cached_urls.lock() {
302                            Ok(mut cached_urls) => {
303                                cached_urls.remove(&url);
304                            }
305                            Err(poisoned) => {
306                                poisoned.into_inner().remove(&url);
307                            }
308                        }
309                        match self.seen_urls.lock() {
310                            Ok(mut seen_urls) => {
311                                seen_urls.remove(&url);
312                            }
313                            Err(poisoned) => {
314                                poisoned.into_inner().remove(&url);
315                            }
316                        }
317                    }
318                }
319            }
320        }
321
322        if changed {
323            self.swap_verifier_from_cache(&cache)?;
324        }
325
326        Ok(())
327    }
328
329    async fn fetch_and_store_url(&self, url: String) -> Result<(), McpxError> {
330        let cached = gated_fetch(
331            &self.client,
332            &self.global_fetch_sem,
333            &self.host_semaphores,
334            &url,
335            self.config.crl_allow_http,
336            self.max_response_bytes,
337            self.config.crl_max_host_semaphores,
338        )
339        .await?;
340        if !self.insert_cache_entry(url, cached).await {
341            return Ok(());
342        }
343        let cache = self.cache.read().await;
344        self.swap_verifier_from_cache(&cache)?;
345        Ok(())
346    }
347
348    fn note_discovered_urls(&self, urls: &[String]) -> bool {
349        // INVARIANT: only called post-handshake from
350        // `DynamicClientCertVerifier::verify_client_cert`. The peer has
351        // already presented a chain that parses; this method must not panic
352        // under attacker-controlled URL contents.
353        let mut missing_cached = false;
354
355        // Snapshot the dedup set under the lock; do NOT mutate it yet.
356        // We promote a URL to "seen" only after it is actually admitted
357        // by the rate-limiter and queued on the discover channel.
358        // Otherwise a single rate-limited handshake would permanently
359        // black-hole the URL: every subsequent handshake would see it as
360        // "already known" and skip the limiter entirely, while the
361        // background fetcher would never have received it. With
362        // `crl_deny_on_unavailable = true` that produces persistent
363        // handshake failures; with fail-open it silently disables CRL
364        // discovery for that endpoint forever.
365        let candidates: Vec<String> = match self.seen_urls.lock() {
366            Ok(seen) => urls
367                .iter()
368                .filter(|url| !seen.contains(*url))
369                .cloned()
370                .collect(),
371            Err(_) => Vec::new(),
372        };
373
374        // Rate-limit gate: drop excess submissions on the floor with a WARN.
375        // The mTLS verifier must remain non-blocking, so we use the
376        // synchronous `check()` API and never await here. Only on a
377        // successful `check()` AND a successful `send()` do we commit
378        // the URL to `seen_urls`; this guarantees retriability of any
379        // URL that lost the limiter race.
380        for url in candidates {
381            if self.discovery_limiter.check().is_err() {
382                tracing::warn!(
383                    url = %url,
384                    "discovery_rate_limited: dropped CDP URL beyond per-minute cap (will be retried on next handshake observing this URL)"
385                );
386                continue;
387            }
388            if self.discover_tx.send(url.clone()).is_err() {
389                // Receiver gone (shutdown). Do NOT mark as seen so the
390                // URL can be retried after a reload / restart.
391                tracing::debug!(
392                    url = %url,
393                    "discover channel closed; dropping CDP URL without marking seen"
394                );
395                continue;
396            }
397            // Admission succeeded: now safe to dedup permanently.
398            let mut guard = self
399                .seen_urls
400                .lock()
401                .unwrap_or_else(std::sync::PoisonError::into_inner);
402            if guard.len() >= self.config.crl_max_seen_urls {
403                self.warn_cap_exceeded_throttled("seen_urls");
404                break;
405            }
406            guard.insert(url);
407        }
408
409        if self.config.crl_deny_on_unavailable {
410            let cached = self
411                .cached_urls
412                .lock()
413                .ok()
414                .map(|guard| guard.clone())
415                .unwrap_or_default();
416            missing_cached = urls.iter().any(|url| !cached.contains(url));
417        }
418
419        missing_cached
420    }
421
422    /// Test helper for constructing a CRL set from in-memory CRLs.
423    ///
424    /// # Errors
425    ///
426    /// Returns an error if the verifier cannot be built from the provided CRLs.
427    #[doc(hidden)]
428    pub fn __test_with_prepopulated_crls(
429        roots: Arc<RootCertStore>,
430        config: MtlsConfig,
431        prefilled_crls: Vec<CertificateRevocationListDer<'static>>,
432    ) -> Result<Arc<Self>, McpxError> {
433        let (discover_tx, discover_rx) = mpsc::unbounded_channel();
434        drop(discover_rx);
435
436        let mut initial_cache = HashMap::new();
437        for (index, der) in prefilled_crls.into_iter().enumerate() {
438            let source_url = format!("memory://crl/{index}");
439            let (this_update, next_update) = parse_crl_metadata(der.as_ref())?;
440            initial_cache.insert(
441                source_url.clone(),
442                CachedCrl {
443                    der,
444                    this_update,
445                    next_update,
446                    fetched_at: SystemTime::now(),
447                    source_url,
448                },
449            );
450        }
451
452        Self::new(roots, config, discover_tx, initial_cache)
453    }
454
455    /// Test-only: same as [`Self::__test_with_prepopulated_crls`] but
456    /// returns the discover-channel receiver to the caller so the
457    /// background channel `send`s succeed (the receiver stays alive
458    /// for the duration of the test). Required by the B2 dedup
459    /// regression test, which must observe URLs being committed to
460    /// `seen_urls` after a successful limiter+send sequence. Not part
461    /// of the public API.
462    ///
463    /// # Errors
464    ///
465    /// Returns an error if the verifier cannot be built from the provided CRLs.
466    #[doc(hidden)]
467    pub fn __test_with_kept_receiver(
468        roots: Arc<RootCertStore>,
469        config: MtlsConfig,
470        prefilled_crls: Vec<CertificateRevocationListDer<'static>>,
471    ) -> Result<(Arc<Self>, mpsc::UnboundedReceiver<String>), McpxError> {
472        let (discover_tx, discover_rx) = mpsc::unbounded_channel();
473
474        let mut initial_cache = HashMap::new();
475        for (index, der) in prefilled_crls.into_iter().enumerate() {
476            let source_url = format!("memory://crl/{index}");
477            let (this_update, next_update) = parse_crl_metadata(der.as_ref())?;
478            initial_cache.insert(
479                source_url.clone(),
480                CachedCrl {
481                    der,
482                    this_update,
483                    next_update,
484                    fetched_at: SystemTime::now(),
485                    source_url,
486                },
487            );
488        }
489
490        let crl_set = Self::new(roots, config, discover_tx, initial_cache)?;
491        Ok((crl_set, discover_rx))
492    }
493
494    /// Test-only: directly invoke the discovery rate-limiter on a batch of URLs
495    /// and return `(accepted, dropped)`. Bypasses the dedup `seen_urls` set so
496    /// callers can deterministically saturate the limiter; mutates the limiter
497    /// state in place. Not part of the public API.
498    #[doc(hidden)]
499    pub fn __test_check_discovery_rate(&self, urls: &[String]) -> (usize, usize) {
500        let mut accepted = 0usize;
501        let mut dropped = 0usize;
502        for url in urls {
503            if self.discovery_limiter.check().is_ok() {
504                let _ = self.discover_tx.send(url.clone());
505                accepted += 1;
506            } else {
507                dropped += 1;
508            }
509        }
510        (accepted, dropped)
511    }
512
513    /// Test-only: invoke the real `note_discovered_urls` so dedup + rate-limit
514    /// + cached-fallback paths are all exercised. Returns the `missing_cached`
515    /// flag the production verifier uses to decide whether to fail the handshake.
516    #[doc(hidden)]
517    pub fn __test_note_discovered_urls(&self, urls: &[String]) -> bool {
518        let missing_cached = self.note_discovered_urls(urls);
519        if self.discover_tx.is_closed() {
520            match self.seen_urls.lock() {
521                Ok(mut guard) => {
522                    for url in urls {
523                        if guard.contains(url) {
524                            continue;
525                        }
526                        if guard.len() >= self.config.crl_max_seen_urls {
527                            self.warn_cap_exceeded_throttled("seen_urls");
528                            break;
529                        }
530                        guard.insert(url.clone());
531                    }
532                }
533                Err(poisoned) => {
534                    let mut guard = poisoned.into_inner();
535                    for url in urls {
536                        if guard.contains(url) {
537                            continue;
538                        }
539                        if guard.len() >= self.config.crl_max_seen_urls {
540                            self.warn_cap_exceeded_throttled("seen_urls");
541                            break;
542                        }
543                        guard.insert(url.clone());
544                    }
545                }
546            }
547        }
548        missing_cached
549    }
550
551    /// Test-only: report whether a URL has been promoted to the
552    /// permanent dedup set. Used by the B2 retriability regression
553    /// test to assert that rate-limited URLs are NOT marked seen.
554    /// Not part of the public API.
555    #[doc(hidden)]
556    pub fn __test_is_seen(&self, url: &str) -> bool {
557        match self.seen_urls.lock() {
558            Ok(seen) => seen.contains(url),
559            Err(_) => false,
560        }
561    }
562
563    /// Test-only: current count of host semaphores. Used by
564    /// `tests/crl_map_bounds.rs` to assert the cap is enforced.
565    #[cfg(any(test, feature = "test-helpers"))]
566    #[doc(hidden)]
567    pub fn __test_host_semaphore_count(&self) -> usize {
568        self.host_semaphores
569            .try_lock()
570            .map_or(0, |guard| guard.len())
571    }
572
573    /// Test-only: current number of entries in the CRL cache.
574    #[cfg(any(test, feature = "test-helpers"))]
575    #[doc(hidden)]
576    pub fn __test_cache_len(&self) -> usize {
577        self.cache.try_read().map_or(0, |guard| guard.len())
578    }
579
580    /// Test-only: whether a specific URL is currently cached.
581    #[cfg(any(test, feature = "test-helpers"))]
582    #[doc(hidden)]
583    pub fn __test_cache_contains(&self, url: &str) -> bool {
584        self.cache
585            .try_read()
586            .is_ok_and(|guard| guard.contains_key(url))
587    }
588
589    /// Test-only: triggers the request-hot-path fetch path for `url`
590    /// WITHOUT going through the TLS handshake. Returns any error the
591    /// host-semaphore cap check produces. A network-unreachable
592    /// failure for the fetch itself is treated as `Ok(())` (test only
593    /// cares about the cap; real tests use mock hosts that won't
594    /// resolve — the cap must fire BEFORE network I/O).
595    #[cfg(any(test, feature = "test-helpers"))]
596    #[doc(hidden)]
597    pub async fn __test_trigger_fetch(&self, url: &str) -> Result<(), McpxError> {
598        if let Err(error) = gated_fetch(
599            &self.client,
600            &self.global_fetch_sem,
601            &self.host_semaphores,
602            url,
603            self.config.crl_allow_http,
604            self.max_response_bytes,
605            self.config.crl_max_host_semaphores,
606        )
607        .await
608        {
609            if error
610                .to_string()
611                .contains("crl_host_semaphore_cap_exceeded")
612            {
613                Err(error)
614            } else {
615                Ok(())
616            }
617        } else {
618            Ok(())
619        }
620    }
621
622    /// Test-only: directly insert `cached` under `url` into both
623    /// `cache` and `cached_urls`, bypassing HTTP. Does NOT enforce
624    /// `crl_max_cache_entries` when called pre-cap — the test uses it
625    /// to stage preconditions. For cap-breach coverage, tests invoke
626    /// the real production insertion path.
627    ///
628    /// Wait — the `cache_hard_cap_drops_newest` test DOES use this
629    /// helper to assert the cap fires. Therefore this helper MUST
630    /// enforce the hard cap (silent drop with warn!) the same way the
631    /// production code does. The helper is a thin wrapper around the
632    /// same internal insertion fn the production path uses.
633    #[cfg(any(test, feature = "test-helpers"))]
634    #[doc(hidden)]
635    pub async fn __test_insert_cache(&self, url: &str, cached: CachedCrl) {
636        let _ = self.insert_cache_entry(url.to_owned(), cached).await;
637    }
638
639    /// Test-only: trigger a refresh cycle for a single URL. Exercises
640    /// the same stale-grace / fetch-failure path as `refresh_urls()`.
641    /// Returns the refresh error (if any) — most tests ignore it
642    /// because they assert post-state, not the transient error.
643    #[cfg(any(test, feature = "test-helpers"))]
644    #[doc(hidden)]
645    pub async fn __test_trigger_refresh_url(&self, url: &str) -> Result<(), McpxError> {
646        self.refresh_urls(vec![url.to_owned()]).await
647    }
648
649    async fn fetch_url_results(
650        &self,
651        urls: Vec<String>,
652    ) -> Vec<(String, Result<CachedCrl, McpxError>)> {
653        let mut tasks = JoinSet::new();
654        for url in urls {
655            let client = self.client.clone();
656            let global_sem = Arc::clone(&self.global_fetch_sem);
657            let host_map = Arc::clone(&self.host_semaphores);
658            let allow_http = self.config.crl_allow_http;
659            let max_bytes = self.max_response_bytes;
660            let max_host_semaphores = self.config.crl_max_host_semaphores;
661            tasks.spawn(async move {
662                let result = gated_fetch(
663                    &client,
664                    &global_sem,
665                    &host_map,
666                    &url,
667                    allow_http,
668                    max_bytes,
669                    max_host_semaphores,
670                )
671                .await;
672                (url, result)
673            });
674        }
675
676        let mut results = Vec::new();
677        while let Some(joined) = tasks.join_next().await {
678            match joined {
679                Ok(result) => results.push(result),
680                Err(error) => {
681                    tracing::warn!(error = %error, "CRL refresh task join failed");
682                }
683            }
684        }
685
686        results
687    }
688
689    fn swap_verifier_from_cache(
690        &self,
691        cache: &impl std::ops::Deref<Target = HashMap<String, CachedCrl>>,
692    ) -> Result<(), McpxError> {
693        let verifier = rebuild_verifier(&self.roots, &self.config, cache)?;
694        self.inner_verifier
695            .store(Arc::new(VerifierHandle(verifier)));
696        Ok(())
697    }
698}
699
700impl CachedCrl {
701    /// Test-only: synthesize a cache entry that looks valid, `next_update`
702    /// = now + 24h. Fields used only to populate the HashMap — the bytes
703    /// are a minimal CRL-shape that won't be parsed by tests.
704    #[cfg(any(test, feature = "test-helpers"))]
705    #[doc(hidden)]
706    #[must_use]
707    pub fn __test_synthetic(now: SystemTime) -> Self {
708        Self {
709            der: CertificateRevocationListDer::from(vec![0x30, 0x00]),
710            this_update: now,
711            next_update: now.checked_add(Duration::from_hours(24)),
712            fetched_at: now,
713            source_url: "test://synthetic".to_owned(),
714        }
715    }
716
717    /// Test-only: synthesize a STALE cache entry (`next_update` in the
718    /// deep past so `is_stale_beyond_grace` fires with any sensible
719    /// `crl_stale_grace`).
720    #[cfg(any(test, feature = "test-helpers"))]
721    #[doc(hidden)]
722    #[must_use]
723    pub fn __test_stale(reference_past: SystemTime) -> Self {
724        Self {
725            der: CertificateRevocationListDer::from(vec![0x30, 0x00]),
726            this_update: reference_past,
727            next_update: Some(reference_past),
728            fetched_at: reference_past,
729            source_url: "test://stale".to_owned(),
730        }
731    }
732}
733
734/// Stable outer verifier that delegates all TLS verification behavior to the
735/// atomically swappable inner verifier.
736pub struct DynamicClientCertVerifier {
737    inner: Arc<CrlSet>,
738    dn_subjects: Vec<DistinguishedName>,
739}
740
741impl DynamicClientCertVerifier {
742    /// Construct a new dynamic verifier from a shared [`CrlSet`].
743    #[must_use]
744    pub fn new(inner: Arc<CrlSet>) -> Self {
745        Self {
746            dn_subjects: inner.roots.subjects(),
747            inner,
748        }
749    }
750}
751
752impl std::fmt::Debug for DynamicClientCertVerifier {
753    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
754        f.debug_struct("DynamicClientCertVerifier")
755            .field("dn_subjects_len", &self.dn_subjects.len())
756            .finish_non_exhaustive()
757    }
758}
759
760impl ClientCertVerifier for DynamicClientCertVerifier {
761    fn offer_client_auth(&self) -> bool {
762        let verifier = self.inner.inner_verifier.load();
763        verifier.0.offer_client_auth()
764    }
765
766    fn client_auth_mandatory(&self) -> bool {
767        let verifier = self.inner.inner_verifier.load();
768        verifier.0.client_auth_mandatory()
769    }
770
771    fn root_hint_subjects(&self) -> &[DistinguishedName] {
772        &self.dn_subjects
773    }
774
775    fn verify_client_cert(
776        &self,
777        end_entity: &CertificateDer<'_>,
778        intermediates: &[CertificateDer<'_>],
779        now: UnixTime,
780    ) -> Result<ClientCertVerified, TlsError> {
781        let mut discovered =
782            extract_cdp_urls(end_entity.as_ref(), self.inner.config.crl_allow_http);
783        for intermediate in intermediates {
784            discovered.extend(extract_cdp_urls(
785                intermediate.as_ref(),
786                self.inner.config.crl_allow_http,
787            ));
788        }
789        discovered.sort();
790        discovered.dedup();
791
792        if self.inner.note_discovered_urls(&discovered) {
793            return Err(TlsError::General(
794                "client certificate revocation status unavailable".to_owned(),
795            ));
796        }
797
798        let verifier = self.inner.inner_verifier.load();
799        verifier
800            .0
801            .verify_client_cert(end_entity, intermediates, now)
802    }
803
804    fn verify_tls12_signature(
805        &self,
806        message: &[u8],
807        cert: &CertificateDer<'_>,
808        dss: &DigitallySignedStruct,
809    ) -> Result<HandshakeSignatureValid, TlsError> {
810        let verifier = self.inner.inner_verifier.load();
811        verifier.0.verify_tls12_signature(message, cert, dss)
812    }
813
814    fn verify_tls13_signature(
815        &self,
816        message: &[u8],
817        cert: &CertificateDer<'_>,
818        dss: &DigitallySignedStruct,
819    ) -> Result<HandshakeSignatureValid, TlsError> {
820        let verifier = self.inner.inner_verifier.load();
821        verifier.0.verify_tls13_signature(message, cert, dss)
822    }
823
824    fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
825        let verifier = self.inner.inner_verifier.load();
826        verifier.0.supported_verify_schemes()
827    }
828
829    fn requires_raw_public_keys(&self) -> bool {
830        let verifier = self.inner.inner_verifier.load();
831        verifier.0.requires_raw_public_keys()
832    }
833}
834
835/// Extract CRL Distribution Point URLs from a DER-encoded certificate.
836///
837/// URLs are validated with `url::Url::parse` (case-insensitive scheme handling)
838/// and filtered through an internal scheme guard. Malformed URLs and URLs
839/// using disallowed schemes are silently dropped. SSRF defenses against private
840/// IP literals and metadata endpoints are applied later, at fetch time, after
841/// DNS resolution.
842#[must_use]
843pub fn extract_cdp_urls(cert_der: &[u8], allow_http: bool) -> Vec<String> {
844    let Ok((_, cert)) = X509Certificate::from_der(cert_der) else {
845        return Vec::new();
846    };
847
848    let mut urls = Vec::new();
849    for ext in cert.extensions() {
850        if let ParsedExtension::CRLDistributionPoints(cdps) = ext.parsed_extension() {
851            for point in cdps.iter() {
852                if let Some(DistributionPointName::FullName(names)) = &point.distribution_point {
853                    for name in names {
854                        if let GeneralName::URI(uri) = name {
855                            let raw = *uri;
856                            let Ok(parsed) = Url::parse(raw) else {
857                                tracing::debug!(url = %raw, "CDP URL parse failed; dropped");
858                                continue;
859                            };
860                            if let Err(reason) = check_scheme(&parsed, allow_http) {
861                                tracing::debug!(
862                                    url = %raw,
863                                    reason,
864                                    "CDP URL rejected by scheme guard; dropped"
865                                );
866                                continue;
867                            }
868                            urls.push(parsed.into());
869                        }
870                    }
871                }
872            }
873        }
874    }
875
876    urls
877}
878
879/// Bootstrap the CRL cache by extracting CDP URLs from the CA chain and
880/// fetching any reachable CRLs with a 10-second total deadline.
881///
882/// # Errors
883///
884/// Returns an error if the initial verifier cannot be built.
885#[allow(
886    clippy::cognitive_complexity,
887    reason = "bootstrap coordinates timeout, parallel fetches, and partial-cache recovery"
888)]
889pub async fn bootstrap_fetch(
890    roots: Arc<RootCertStore>,
891    ca_certs: &[CertificateDer<'static>],
892    config: MtlsConfig,
893) -> Result<(Arc<CrlSet>, mpsc::UnboundedReceiver<String>), McpxError> {
894    let (discover_tx, discover_rx) = mpsc::unbounded_channel();
895
896    let mut urls = ca_certs
897        .iter()
898        .flat_map(|cert| extract_cdp_urls(cert.as_ref(), config.crl_allow_http))
899        .collect::<Vec<_>>();
900    urls.sort();
901    urls.dedup();
902
903    let client = reqwest::Client::builder()
904        .timeout(config.crl_fetch_timeout)
905        .connect_timeout(CRL_CONNECT_TIMEOUT)
906        .tcp_keepalive(None)
907        .redirect(reqwest::redirect::Policy::none())
908        .user_agent(format!("rmcp-server-kit/{}", env!("CARGO_PKG_VERSION")))
909        .build()
910        .map_err(|error| McpxError::Startup(format!("CRL HTTP client init: {error}")))?;
911
912    // Bootstrap shares the same global concurrency + per-host cap as the
913    // hot-path verifier so a maliciously broad CA chain cannot overwhelm
914    // the network at startup.
915    let bootstrap_concurrency = config.crl_max_concurrent_fetches.max(1);
916    let global_sem = Arc::new(Semaphore::new(bootstrap_concurrency));
917    let host_semaphores = Arc::new(tokio::sync::Mutex::new(HashMap::new()));
918    let allow_http = config.crl_allow_http;
919    let max_bytes = config.crl_max_response_bytes;
920    let max_host_semaphores = config.crl_max_host_semaphores;
921
922    let mut initial_cache = HashMap::new();
923    let mut tasks = JoinSet::new();
924    for url in &urls {
925        let client = client.clone();
926        let url = url.clone();
927        let global_sem = Arc::clone(&global_sem);
928        let host_semaphores = Arc::clone(&host_semaphores);
929        tasks.spawn(async move {
930            let result = gated_fetch(
931                &client,
932                &global_sem,
933                &host_semaphores,
934                &url,
935                allow_http,
936                max_bytes,
937                max_host_semaphores,
938            )
939            .await;
940            (url, result)
941        });
942    }
943
944    let timeout: Sleep = tokio::time::sleep(BOOTSTRAP_TIMEOUT);
945    tokio::pin!(timeout);
946
947    while !tasks.is_empty() {
948        tokio::select! {
949            () = &mut timeout => {
950                tracing::warn!("CRL bootstrap timed out after {:?}", BOOTSTRAP_TIMEOUT);
951                break;
952            }
953            maybe_joined = tasks.join_next() => {
954                let Some(joined) = maybe_joined else {
955                    break;
956                };
957                match joined {
958                    Ok((url, Ok(cached))) => {
959                        initial_cache.insert(url, cached);
960                    }
961                    Ok((url, Err(error))) => {
962                        tracing::warn!(url = %url, error = %error, "CRL bootstrap fetch failed");
963                    }
964                    Err(error) => {
965                        tracing::warn!(error = %error, "CRL bootstrap task join failed");
966                    }
967                }
968            }
969        }
970    }
971
972    let set = CrlSet::new(roots, config, discover_tx, initial_cache)?;
973    Ok((set, discover_rx))
974}
975
976/// Run the CRL refresher loop until shutdown.
977#[allow(
978    clippy::cognitive_complexity,
979    reason = "refresher loop intentionally handles shutdown, timer, and discovery in one select"
980)]
981pub async fn run_crl_refresher(
982    set: Arc<CrlSet>,
983    mut discover_rx: mpsc::UnboundedReceiver<String>,
984    shutdown: CancellationToken,
985) {
986    let mut refresh_sleep = schedule_next_refresh(&set).await;
987
988    loop {
989        tokio::select! {
990            () = shutdown.cancelled() => {
991                break;
992            }
993            () = &mut refresh_sleep => {
994                if let Err(error) = set.refresh_due_urls().await {
995                    tracing::warn!(error = %error, "CRL periodic refresh failed");
996                }
997                refresh_sleep = schedule_next_refresh(&set).await;
998            }
999            maybe_url = discover_rx.recv() => {
1000                let Some(url) = maybe_url else {
1001                    break;
1002                };
1003                if let Err(error) = set.fetch_and_store_url(url.clone()).await {
1004                    tracing::warn!(url = %url, error = %error, "CRL discovery fetch failed");
1005                }
1006                refresh_sleep = schedule_next_refresh(&set).await;
1007            }
1008        }
1009    }
1010}
1011
1012/// Rebuild the inner rustls verifier from the current CRL cache.
1013///
1014/// # Errors
1015///
1016/// Returns an error if rustls rejects the verifier configuration.
1017pub fn rebuild_verifier<S: std::hash::BuildHasher>(
1018    roots: &Arc<RootCertStore>,
1019    config: &MtlsConfig,
1020    cache: &HashMap<String, CachedCrl, S>,
1021) -> Result<Arc<dyn ClientCertVerifier>, McpxError> {
1022    let mut builder = WebPkiClientVerifier::builder(Arc::clone(roots));
1023
1024    if !cache.is_empty() {
1025        let crls = cache
1026            .values()
1027            .map(|cached| cached.der.clone())
1028            .collect::<Vec<_>>();
1029        builder = builder.with_crls(crls);
1030    }
1031    if config.crl_end_entity_only {
1032        builder = builder.only_check_end_entity_revocation();
1033    }
1034    if !config.crl_deny_on_unavailable {
1035        builder = builder.allow_unknown_revocation_status();
1036    }
1037    if config.crl_enforce_expiration {
1038        builder = builder.enforce_revocation_expiration();
1039    }
1040    if !config.required {
1041        builder = builder.allow_unauthenticated();
1042    }
1043
1044    builder
1045        .build()
1046        .map_err(|error| McpxError::Tls(format!("mTLS verifier error: {error}")))
1047}
1048
1049/// Parse `thisUpdate` and `nextUpdate` metadata from a DER-encoded CRL.
1050///
1051/// # Errors
1052///
1053/// Returns an error if the CRL cannot be parsed.
1054pub fn parse_crl_metadata(der: &[u8]) -> Result<(SystemTime, Option<SystemTime>), McpxError> {
1055    let (_, crl) = CertificateRevocationList::from_der(der)
1056        .map_err(|error| McpxError::Tls(format!("invalid CRL DER: {error:?}")))?;
1057
1058    Ok((
1059        asn1_time_to_system_time(crl.last_update()),
1060        crl.next_update().map(asn1_time_to_system_time),
1061    ))
1062}
1063
1064async fn schedule_next_refresh(set: &CrlSet) -> Pin<Box<Sleep>> {
1065    let duration = next_refresh_delay(set).await;
1066    boxed_sleep(duration)
1067}
1068
1069fn boxed_sleep(duration: Duration) -> Pin<Box<Sleep>> {
1070    Box::pin(tokio::time::sleep_until(Instant::now() + duration))
1071}
1072
1073async fn next_refresh_delay(set: &CrlSet) -> Duration {
1074    if let Some(interval) = set.config.crl_refresh_interval {
1075        return clamp_refresh(interval);
1076    }
1077
1078    let now = SystemTime::now();
1079    let cache = set.cache.read().await;
1080    let mut next = MAX_AUTO_REFRESH;
1081
1082    for cached in cache.values() {
1083        if let Some(next_update) = cached.next_update {
1084            let duration = next_update.duration_since(now).unwrap_or(Duration::ZERO);
1085            next = next.min(clamp_refresh(duration));
1086        }
1087    }
1088
1089    next
1090}
1091
1092/// Fetch a single CRL URL through the global + per-host concurrency caps.
1093///
1094/// `global_sem` caps total simultaneous CRL fetches process-wide.
1095/// `host_semaphores` ensures at most one in-flight fetch per origin host
1096/// (an SSRF amplification defense). Both permits are dropped when the
1097/// returned future completes (whether `Ok` or `Err`).
1098async fn gated_fetch(
1099    client: &reqwest::Client,
1100    global_sem: &Arc<Semaphore>,
1101    host_semaphores: &Arc<tokio::sync::Mutex<HashMap<String, Arc<Semaphore>>>>,
1102    url: &str,
1103    allow_http: bool,
1104    max_bytes: u64,
1105    max_host_semaphores: usize,
1106) -> Result<CachedCrl, McpxError> {
1107    let host_key = Url::parse(url)
1108        .ok()
1109        .and_then(|u| u.host_str().map(str::to_owned))
1110        .unwrap_or_else(|| url.to_owned());
1111
1112    let host_sem = {
1113        let mut map = host_semaphores.lock().await;
1114        if !map.contains_key(&host_key) {
1115            if map.len() >= max_host_semaphores {
1116                return Err(McpxError::Config(
1117                    "crl_host_semaphore_cap_exceeded: too many distinct CRL hosts in flight"
1118                        .to_owned(),
1119                ));
1120            }
1121            map.insert(host_key.clone(), Arc::new(Semaphore::new(1)));
1122        }
1123        match map.get(&host_key) {
1124            Some(semaphore) => Arc::clone(semaphore),
1125            None => {
1126                return Err(McpxError::Tls(
1127                    "CRL host semaphore missing after insertion".to_owned(),
1128                ));
1129            }
1130        }
1131    };
1132
1133    let _global_permit = Arc::clone(global_sem)
1134        .acquire_owned()
1135        .await
1136        .map_err(|error| McpxError::Tls(format!("CRL global semaphore closed: {error}")))?;
1137    let _host_permit = host_sem
1138        .acquire_owned()
1139        .await
1140        .map_err(|error| McpxError::Tls(format!("CRL host semaphore closed: {error}")))?;
1141
1142    fetch_crl(client, url, allow_http, max_bytes).await
1143}
1144
1145async fn fetch_crl(
1146    client: &reqwest::Client,
1147    url: &str,
1148    allow_http: bool,
1149    max_bytes: u64,
1150) -> Result<CachedCrl, McpxError> {
1151    let parsed =
1152        Url::parse(url).map_err(|error| McpxError::Tls(format!("CRL URL parse {url}: {error}")))?;
1153
1154    if let Err(reason) = check_scheme(&parsed, allow_http) {
1155        tracing::warn!(url = %url, reason, "CRL fetch denied: scheme");
1156        return Err(McpxError::Tls(format!(
1157            "CRL scheme rejected ({reason}): {url}"
1158        )));
1159    }
1160
1161    let host = parsed
1162        .host_str()
1163        .ok_or_else(|| McpxError::Tls(format!("CRL URL has no host: {url}")))?;
1164    let port = parsed
1165        .port_or_known_default()
1166        .ok_or_else(|| McpxError::Tls(format!("CRL URL has no known port: {url}")))?;
1167
1168    let addrs = lookup_host((host, port))
1169        .await
1170        .map_err(|error| McpxError::Tls(format!("CRL DNS resolution {url}: {error}")))?;
1171
1172    let mut any_addr = false;
1173    for addr in addrs {
1174        any_addr = true;
1175        if let Some(reason) = ip_block_reason(addr.ip()) {
1176            tracing::warn!(
1177                url = %url,
1178                resolved_ip = %addr.ip(),
1179                reason,
1180                "CRL fetch denied: blocked IP"
1181            );
1182            return Err(McpxError::Tls(format!(
1183                "CRL host resolved to blocked IP ({reason}): {url}"
1184            )));
1185        }
1186    }
1187    if !any_addr {
1188        return Err(McpxError::Tls(format!(
1189            "CRL DNS resolution returned no addresses: {url}"
1190        )));
1191    }
1192
1193    let mut response = client
1194        .get(url)
1195        .send()
1196        .await
1197        .map_err(|error| McpxError::Tls(format!("CRL fetch {url}: {error}")))?
1198        .error_for_status()
1199        .map_err(|error| McpxError::Tls(format!("CRL fetch {url}: {error}")))?;
1200
1201    // Enforce body cap by streaming chunk-by-chunk; a malicious or
1202    // misconfigured server cannot allocate more than `max_bytes` of memory.
1203    let initial_capacity = usize::try_from(max_bytes.min(64 * 1024)).unwrap_or(64 * 1024);
1204    let mut body: Vec<u8> = Vec::with_capacity(initial_capacity);
1205    while let Some(chunk) = response
1206        .chunk()
1207        .await
1208        .map_err(|error| McpxError::Tls(format!("CRL read {url}: {error}")))?
1209    {
1210        let chunk_len = u64::try_from(chunk.len()).unwrap_or(u64::MAX);
1211        let body_len = u64::try_from(body.len()).unwrap_or(u64::MAX);
1212        if body_len.saturating_add(chunk_len) > max_bytes {
1213            return Err(McpxError::Tls(format!(
1214                "CRL body exceeded cap of {max_bytes} bytes: {url}"
1215            )));
1216        }
1217        body.extend_from_slice(&chunk);
1218    }
1219
1220    let der = CertificateRevocationListDer::from(body);
1221    let (this_update, next_update) = parse_crl_metadata(der.as_ref())?;
1222
1223    Ok(CachedCrl {
1224        der,
1225        this_update,
1226        next_update,
1227        fetched_at: SystemTime::now(),
1228        source_url: url.to_owned(),
1229    })
1230}
1231
1232fn should_refresh_cached(
1233    cached: &CachedCrl,
1234    now: SystemTime,
1235    fixed_interval: Option<Duration>,
1236) -> bool {
1237    if let Some(interval) = fixed_interval {
1238        return cached
1239            .fetched_at
1240            .checked_add(clamp_refresh(interval))
1241            .is_none_or(|deadline| now >= deadline);
1242    }
1243
1244    cached
1245        .next_update
1246        .is_none_or(|next_update| now >= next_update)
1247}
1248
1249fn clamp_refresh(duration: Duration) -> Duration {
1250    duration.clamp(MIN_AUTO_REFRESH, MAX_AUTO_REFRESH)
1251}
1252
1253fn asn1_time_to_system_time(time: x509_parser::time::ASN1Time) -> SystemTime {
1254    let timestamp = time.timestamp();
1255    if timestamp >= 0 {
1256        let seconds = u64::try_from(timestamp).unwrap_or(0);
1257        UNIX_EPOCH + Duration::from_secs(seconds)
1258    } else {
1259        UNIX_EPOCH - Duration::from_secs(timestamp.unsigned_abs())
1260    }
1261}