Skip to main content

rmcp_server_kit/
mtls_revocation.rs

1//! CDP-driven CRL revocation support for mTLS.
2//!
3//! When mTLS is configured with CRL checks enabled, startup performs a bounded
4//! bootstrap pass over the configured CA bundle, extracts CRL Distribution
5//! Point (CDP) URLs, fetches reachable CRLs, and builds the initial inner
6//! `rustls` verifier from that cache.
7//!
8//! During handshakes, the outer verifier remains stable for the lifetime of the
9//! TLS acceptor while its inner `WebPkiClientVerifier` is swapped atomically via
10//! `ArcSwap` as CRLs are discovered or refreshed. Discovery from connecting
11//! client certificates is fire-and-forget and never blocks the synchronous
12//! handshake path.
13//!
14//! Security note: CDP URLs are extracted from attacker-controllable client
15//! certs *before* chain validation. This is safe by design; see the
16//! `// SECURITY:` comment on `DynamicClientCertVerifier::verify_client_cert`
17//! for the full rationale before changing the discovery ordering.
18//!
19//! Semantics:
20//! - `crl_deny_on_unavailable = false` => fail open with warn logs.
21//! - `crl_deny_on_unavailable = true` => fail closed when a certificate
22//!   advertises CDP URLs whose revocation status is not yet available.
23
24use std::{
25    collections::{HashMap, HashSet},
26    num::NonZeroU32,
27    pin::Pin,
28    sync::{Arc, Mutex},
29    time::{Duration, SystemTime, UNIX_EPOCH},
30};
31
32use arc_swap::ArcSwap;
33use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
34use rustls::{
35    DigitallySignedStruct, DistinguishedName, Error as TlsError, RootCertStore, SignatureScheme,
36    client::danger::HandshakeSignatureValid,
37    pki_types::{CertificateDer, CertificateRevocationListDer, UnixTime},
38    server::{
39        WebPkiClientVerifier,
40        danger::{ClientCertVerified, ClientCertVerifier},
41    },
42};
43use tokio::{
44    net::lookup_host,
45    sync::{RwLock, Semaphore, mpsc},
46    task::JoinSet,
47    time::{Instant, Sleep},
48};
49use tokio_util::sync::CancellationToken;
50use url::Url;
51use x509_parser::{
52    extensions::{DistributionPointName, GeneralName, ParsedExtension},
53    prelude::{FromDer, X509Certificate},
54    revocation_list::CertificateRevocationList,
55};
56
57use crate::{
58    auth::MtlsConfig,
59    error::McpxError,
60    ssrf::{check_scheme, ip_block_reason, sanitized_url_for_log},
61};
62
63const BOOTSTRAP_TIMEOUT: Duration = Duration::from_secs(10);
64const MIN_AUTO_REFRESH: Duration = Duration::from_mins(10);
65const MAX_AUTO_REFRESH: Duration = Duration::from_hours(24);
66/// Connection timeout for CRL HTTP fetches. Independent of overall fetch
67/// timeout to bound time spent on unreachable hosts.
68const CRL_CONNECT_TIMEOUT: Duration = Duration::from_secs(3);
69
70/// Parsed CRL cached in memory and keyed by its source URL.
71#[derive(Clone, Debug)]
72#[non_exhaustive]
73pub struct CachedCrl {
74    /// DER bytes for the CRL.
75    pub der: CertificateRevocationListDer<'static>,
76    /// `thisUpdate` field from the CRL.
77    pub this_update: SystemTime,
78    /// `nextUpdate` field from the CRL, if present.
79    pub next_update: Option<SystemTime>,
80    /// Time the server fetched this CRL.
81    pub fetched_at: SystemTime,
82    /// Source URL used for retrieval.
83    pub source_url: String,
84}
85
86pub(crate) struct VerifierHandle(pub Arc<dyn ClientCertVerifier>);
87
88impl std::fmt::Debug for VerifierHandle {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        f.debug_struct("VerifierHandle").finish_non_exhaustive()
91    }
92}
93
94/// Shared CRL state backing the dynamic mTLS verifier.
95#[allow(
96    missing_debug_implementations,
97    reason = "contains ArcSwap and dyn verifier internals"
98)]
99#[non_exhaustive]
100pub struct CrlSet {
101    inner_verifier: ArcSwap<VerifierHandle>,
102    /// Cached CRLs keyed by URL.
103    pub cache: RwLock<HashMap<String, CachedCrl>>,
104    /// Immutable client-auth root store.
105    pub roots: Arc<RootCertStore>,
106    /// mTLS CRL configuration.
107    pub config: MtlsConfig,
108    /// Fire-and-forget discovery channel for newly-seen CDP URLs.
109    pub discover_tx: mpsc::UnboundedSender<String>,
110    client: reqwest::Client,
111    seen_urls: Mutex<HashSet<String>>,
112    cached_urls: Mutex<HashSet<String>>,
113    /// Global cap on simultaneous CRL HTTP fetches (SSRF amplification guard).
114    global_fetch_sem: Arc<Semaphore>,
115    /// Per-host serializer (one in-flight fetch per origin host). Bounded
116    /// by `crl_max_host_semaphores`; at the cap, idle entries are evicted
117    /// on demand (see [`acquire_host_semaphore`]), so the cap only rejects
118    /// genuinely concurrent fetch floods and is never a permanent lockout.
119    host_semaphores: Arc<tokio::sync::Mutex<HashMap<String, Arc<Semaphore>>>>,
120    /// Global rate-limiter on discovery URL submissions; protects against
121    /// cert-driven URL flooding by a malicious mTLS peer.
122    ///
123    /// Note: this ships as a process-global limiter; per-source-IP scoping
124    /// is deferred to a future release because the rustls
125    /// `verify_client_cert` callback does not carry a `SocketAddr` for the
126    /// peer. This is a CRL-discovery limiter in the TLS verifier path —
127    /// distinct from the bearer pre-auth limiter (`AuthState`), which is
128    /// already keyed per-IP via a bounded keyed governor and lives in the
129    /// ordinary request middleware path.
130    discovery_limiter: Arc<DefaultDirectRateLimiter>,
131    /// Cached cap on per-fetch response body size; copied from `config` so the
132    /// hot path doesn't re-read the (rarely changing) config struct.
133    max_response_bytes: u64,
134    last_cap_warn: Mutex<HashMap<&'static str, Instant>>,
135}
136
137impl CrlSet {
138    fn new(
139        roots: Arc<RootCertStore>,
140        config: MtlsConfig,
141        discover_tx: mpsc::UnboundedSender<String>,
142        initial_cache: HashMap<String, CachedCrl>,
143    ) -> Result<Arc<Self>, McpxError> {
144        // M-H2: install the SSRF screening resolver on the CRL fetcher.
145        // CRL CDP URLs come from attacker-controllable client certs and
146        // their hosts are re-resolved per fetch -- exactly the TOCTOU
147        // class M-H2 closes. The allowlist is empty (default-strict),
148        // matching the existing CRL pre-flight posture; operators who
149        // need internal CDPs would extend this with the same
150        // CompiledSsrfAllowlist plumbing used by oauth.
151        let allowlist = Arc::new(crate::ssrf::CompiledSsrfAllowlist::default());
152        let resolver: Arc<dyn reqwest::dns::Resolve> =
153            Arc::new(crate::ssrf_resolver::SsrfScreeningResolver::new(
154                Arc::clone(&allowlist),
155                #[cfg(any(test, feature = "test-helpers"))]
156                Arc::new(std::sync::atomic::AtomicBool::new(false)),
157                #[cfg(not(any(test, feature = "test-helpers")))]
158                (),
159            ));
160
161        let client = reqwest::Client::builder()
162            // M-H2/N1: see oauth.rs::OauthHttpClient::build for rationale.
163            .no_proxy()
164            .dns_resolver(Arc::clone(&resolver))
165            .timeout(config.crl_fetch_timeout)
166            .connect_timeout(CRL_CONNECT_TIMEOUT)
167            .tcp_keepalive(None)
168            .redirect(reqwest::redirect::Policy::none())
169            .user_agent(format!("rmcp-server-kit/{}", env!("CARGO_PKG_VERSION")))
170            .build()
171            .map_err(|error| McpxError::Startup(format!("CRL HTTP client init: {error}")))?;
172
173        let initial_verifier = rebuild_verifier(&roots, &config, &initial_cache)?;
174        let seen_urls = initial_cache.keys().cloned().collect::<HashSet<_>>();
175        let cached_urls = seen_urls.clone();
176
177        let concurrency = config.crl_max_concurrent_fetches.max(1);
178        let global_fetch_sem = Arc::new(Semaphore::new(concurrency));
179        let host_semaphores = Arc::new(tokio::sync::Mutex::new(HashMap::new()));
180
181        let rate =
182            NonZeroU32::new(config.crl_discovery_rate_per_min.max(1)).unwrap_or(NonZeroU32::MIN);
183        let discovery_limiter = Arc::new(RateLimiter::direct(Quota::per_minute(rate)));
184
185        let max_response_bytes = config.crl_max_response_bytes;
186
187        Ok(Arc::new(Self {
188            inner_verifier: ArcSwap::from_pointee(VerifierHandle(initial_verifier)),
189            cache: RwLock::new(initial_cache),
190            roots,
191            config,
192            discover_tx,
193            client,
194            seen_urls: Mutex::new(seen_urls),
195            cached_urls: Mutex::new(cached_urls),
196            global_fetch_sem,
197            host_semaphores,
198            discovery_limiter,
199            max_response_bytes,
200            last_cap_warn: Mutex::new(HashMap::new()),
201        }))
202    }
203
204    fn warn_cap_exceeded_throttled(&self, which: &'static str) {
205        let now = Instant::now();
206        let cooldown = Duration::from_mins(1);
207        let should_warn = match self.last_cap_warn.lock() {
208            Ok(mut guard) => {
209                let should_emit = guard
210                    .get(which)
211                    .is_none_or(|last| now.saturating_duration_since(*last) >= cooldown);
212                if should_emit {
213                    guard.insert(which, now);
214                }
215                should_emit
216            }
217            Err(poisoned) => {
218                let mut guard = poisoned.into_inner();
219                let should_emit = guard
220                    .get(which)
221                    .is_none_or(|last| now.saturating_duration_since(*last) >= cooldown);
222                if should_emit {
223                    guard.insert(which, now);
224                }
225                should_emit
226            }
227        };
228
229        if should_warn {
230            tracing::warn!(which = which, "CRL map cap exceeded; dropping newest entry");
231        }
232    }
233
234    async fn insert_cache_entry(&self, url: String, cached: CachedCrl) -> bool {
235        // POLICY: at cap the NEWEST entry is rejected, never an existing
236        // one (no LRU). Under adversarial unique-CDP churn an LRU would
237        // let an attacker evict the legitimate warm set by spamming
238        // throwaway CDP URLs; rejecting newcomers instead preserves
239        // revocation coverage for the established CA estate. Confirmed
240        // by Oracle review of the 1.13.0 rust-review fix plan.
241        let inserted = {
242            let mut guard = self.cache.write().await;
243            if guard.len() >= self.config.crl_max_cache_entries && !guard.contains_key(&url) {
244                false
245            } else {
246                guard.insert(url.clone(), cached);
247                true
248            }
249        };
250
251        if inserted {
252            match self.cached_urls.lock() {
253                Ok(mut cached_urls) => {
254                    cached_urls.insert(url);
255                }
256                Err(poisoned) => {
257                    poisoned.into_inner().insert(url);
258                }
259            }
260        } else {
261            self.warn_cap_exceeded_throttled("cache");
262        }
263
264        inserted
265    }
266
267    /// Force an immediate refresh of all currently known CRL URLs.
268    ///
269    /// # Errors
270    ///
271    /// Returns an error if rebuilding the inner verifier fails.
272    pub async fn force_refresh(&self) -> Result<(), McpxError> {
273        let urls = {
274            let cache = self.cache.read().await;
275            cache.keys().cloned().collect::<Vec<_>>()
276        };
277        self.refresh_urls(urls).await
278    }
279
280    async fn refresh_due_urls(&self) -> Result<(), McpxError> {
281        let now = SystemTime::now();
282        let urls = {
283            let cache = self.cache.read().await;
284            cache
285                .iter()
286                .filter(|(_, cached)| {
287                    should_refresh_cached(cached, now, self.config.crl_refresh_interval)
288                })
289                .map(|(url, _)| url.clone())
290                .collect::<Vec<_>>()
291        };
292
293        if urls.is_empty() {
294            return Ok(());
295        }
296
297        self.refresh_urls(urls).await
298    }
299
300    async fn refresh_urls(&self, urls: Vec<String>) -> Result<(), McpxError> {
301        let results = self.fetch_url_results(urls).await;
302        let now = SystemTime::now();
303        let mut cache = self.cache.write().await;
304        let mut changed = false;
305
306        for (url, result) in results {
307            match result {
308                Ok(cached) => {
309                    if cache.len() >= self.config.crl_max_cache_entries && !cache.contains_key(&url)
310                    {
311                        drop(cache);
312                        self.warn_cap_exceeded_throttled("cache");
313                        cache = self.cache.write().await;
314                        continue;
315                    }
316                    cache.insert(url.clone(), cached);
317                    changed = true;
318                    match self.cached_urls.lock() {
319                        Ok(mut cached_urls) => {
320                            cached_urls.insert(url);
321                        }
322                        Err(poisoned) => {
323                            poisoned.into_inner().insert(url);
324                        }
325                    }
326                }
327                Err(error) => {
328                    let remove_entry = cache.get(&url).is_some_and(|existing| {
329                        existing
330                            .next_update
331                            .and_then(|next| next.checked_add(self.config.crl_stale_grace))
332                            .is_some_and(|deadline| now > deadline)
333                    });
334                    tracing::warn!(url = %url, error = %error, "CRL refresh failed");
335                    if remove_entry {
336                        cache.remove(&url);
337                        changed = true;
338                        match self.cached_urls.lock() {
339                            Ok(mut cached_urls) => {
340                                cached_urls.remove(&url);
341                            }
342                            Err(poisoned) => {
343                                poisoned.into_inner().remove(&url);
344                            }
345                        }
346                        match self.seen_urls.lock() {
347                            Ok(mut seen_urls) => {
348                                seen_urls.remove(&url);
349                            }
350                            Err(poisoned) => {
351                                poisoned.into_inner().remove(&url);
352                            }
353                        }
354                    }
355                }
356            }
357        }
358
359        if changed {
360            self.swap_verifier_from_cache(&cache)?;
361        }
362        drop(cache);
363
364        Ok(())
365    }
366
367    async fn fetch_and_store_url(&self, url: String) -> Result<(), McpxError> {
368        let cached = gated_fetch(
369            &self.client,
370            &self.global_fetch_sem,
371            &self.host_semaphores,
372            &url,
373            self.config.crl_allow_http,
374            self.max_response_bytes,
375            self.config.crl_max_host_semaphores,
376        )
377        .await?;
378        if !self.insert_cache_entry(url, cached).await {
379            return Ok(());
380        }
381        let cache = self.cache.read().await;
382        self.swap_verifier_from_cache(&cache)?;
383        Ok(())
384    }
385
386    fn note_discovered_urls(&self, urls: &[String]) -> bool {
387        // INVARIANT: only called post-handshake from
388        // `DynamicClientCertVerifier::verify_client_cert`. The peer has
389        // already presented a chain that parses; this method must not panic
390        // under attacker-controlled URL contents.
391        //
392        // SECURITY: see `DynamicClientCertVerifier::verify_client_cert` for
393        // the rationale on why accepting URLs from an unverified cert is
394        // safe (no HTTP on this path; fetch is off-path and SSRF-gated).
395        let mut missing_cached = false;
396
397        // Snapshot the dedup set under the lock; do NOT mutate it yet.
398        // We promote a URL to "seen" only after it is actually admitted
399        // by the rate-limiter and queued on the discover channel.
400        // Otherwise a single rate-limited handshake would permanently
401        // black-hole the URL: every subsequent handshake would see it as
402        // "already known" and skip the limiter entirely, while the
403        // background fetcher would never have received it. With
404        // `crl_deny_on_unavailable = true` that produces persistent
405        // handshake failures; with fail-open it silently disables CRL
406        // discovery for that endpoint forever.
407        let candidates: Vec<String> = match self.seen_urls.lock() {
408            Ok(seen) => urls
409                .iter()
410                .filter(|url| !seen.contains(*url))
411                .cloned()
412                .collect(),
413            Err(_) => Vec::new(),
414        };
415
416        // Rate-limit gate: drop excess submissions on the floor with a WARN.
417        // The mTLS verifier must remain non-blocking, so we use the
418        // synchronous `check()` API and never await here. Only on a
419        // successful `check()` AND a successful `send()` do we commit
420        // the URL to `seen_urls`; this guarantees retriability of any
421        // URL that lost the limiter race.
422        for url in candidates {
423            if self.discovery_limiter.check().is_err() {
424                tracing::warn!(
425                    url = %url,
426                    "discovery_rate_limited: dropped CDP URL beyond per-minute cap (will be retried on next handshake observing this URL)"
427                );
428                continue;
429            }
430            if self.discover_tx.send(url.clone()).is_err() {
431                // Receiver gone (shutdown). Do NOT mark as seen so the
432                // URL can be retried after a reload / restart.
433                tracing::debug!(
434                    url = %url,
435                    "discover channel closed; dropping CDP URL without marking seen"
436                );
437                continue;
438            }
439            // Admission succeeded: now safe to dedup permanently.
440            let mut guard = self
441                .seen_urls
442                .lock()
443                .unwrap_or_else(std::sync::PoisonError::into_inner);
444            if guard.len() >= self.config.crl_max_seen_urls {
445                self.warn_cap_exceeded_throttled("seen_urls");
446                break;
447            }
448            guard.insert(url);
449        }
450
451        if self.config.crl_deny_on_unavailable {
452            let cached = self
453                .cached_urls
454                .lock()
455                .ok()
456                .map(|guard| guard.clone())
457                .unwrap_or_default();
458            missing_cached = urls.iter().any(|url| !cached.contains(url));
459        }
460
461        missing_cached
462    }
463
464    /// Test helper for constructing a CRL set from in-memory CRLs.
465    ///
466    /// # Errors
467    ///
468    /// Returns an error if the verifier cannot be built from the provided CRLs.
469    #[doc(hidden)]
470    pub fn __test_with_prepopulated_crls(
471        roots: Arc<RootCertStore>,
472        config: MtlsConfig,
473        prefilled_crls: Vec<CertificateRevocationListDer<'static>>,
474    ) -> Result<Arc<Self>, McpxError> {
475        let (discover_tx, discover_rx) = mpsc::unbounded_channel();
476        drop(discover_rx);
477
478        let mut initial_cache = HashMap::new();
479        for (index, der) in prefilled_crls.into_iter().enumerate() {
480            let source_url = format!("memory://crl/{index}");
481            let (this_update, next_update) = parse_crl_metadata(der.as_ref())?;
482            initial_cache.insert(
483                source_url.clone(),
484                CachedCrl {
485                    der,
486                    this_update,
487                    next_update,
488                    fetched_at: SystemTime::now(),
489                    source_url,
490                },
491            );
492        }
493
494        Self::new(roots, config, discover_tx, initial_cache)
495    }
496
497    /// Test-only: same as [`Self::__test_with_prepopulated_crls`] but
498    /// returns the discover-channel receiver to the caller so the
499    /// background channel `send`s succeed (the receiver stays alive
500    /// for the duration of the test). Required by the B2 dedup
501    /// regression test, which must observe URLs being committed to
502    /// `seen_urls` after a successful limiter+send sequence. Not part
503    /// of the public API.
504    ///
505    /// # Errors
506    ///
507    /// Returns an error if the verifier cannot be built from the provided CRLs.
508    #[doc(hidden)]
509    pub fn __test_with_kept_receiver(
510        roots: Arc<RootCertStore>,
511        config: MtlsConfig,
512        prefilled_crls: Vec<CertificateRevocationListDer<'static>>,
513    ) -> Result<(Arc<Self>, mpsc::UnboundedReceiver<String>), McpxError> {
514        let (discover_tx, discover_rx) = mpsc::unbounded_channel();
515
516        let mut initial_cache = HashMap::new();
517        for (index, der) in prefilled_crls.into_iter().enumerate() {
518            let source_url = format!("memory://crl/{index}");
519            let (this_update, next_update) = parse_crl_metadata(der.as_ref())?;
520            initial_cache.insert(
521                source_url.clone(),
522                CachedCrl {
523                    der,
524                    this_update,
525                    next_update,
526                    fetched_at: SystemTime::now(),
527                    source_url,
528                },
529            );
530        }
531
532        let crl_set = Self::new(roots, config, discover_tx, initial_cache)?;
533        Ok((crl_set, discover_rx))
534    }
535
536    /// Test-only: directly invoke the discovery rate-limiter on a batch of URLs
537    /// and return `(accepted, dropped)`. Bypasses the dedup `seen_urls` set so
538    /// callers can deterministically saturate the limiter; mutates the limiter
539    /// state in place. Not part of the public API.
540    #[doc(hidden)]
541    pub fn __test_check_discovery_rate(&self, urls: &[String]) -> (usize, usize) {
542        let mut accepted = 0usize;
543        let mut dropped = 0usize;
544        for url in urls {
545            if self.discovery_limiter.check().is_ok() {
546                let _ = self.discover_tx.send(url.clone());
547                accepted += 1;
548            } else {
549                dropped += 1;
550            }
551        }
552        (accepted, dropped)
553    }
554
555    /// Test-only: invoke the real `note_discovered_urls` so dedup + rate-limit
556    /// + cached-fallback paths are all exercised. Returns the `missing_cached`
557    /// flag the production verifier uses to decide whether to fail the handshake.
558    #[doc(hidden)]
559    pub fn __test_note_discovered_urls(&self, urls: &[String]) -> bool {
560        let missing_cached = self.note_discovered_urls(urls);
561        if self.discover_tx.is_closed() {
562            match self.seen_urls.lock() {
563                Ok(mut guard) => {
564                    for url in urls {
565                        if guard.contains(url) {
566                            continue;
567                        }
568                        if guard.len() >= self.config.crl_max_seen_urls {
569                            self.warn_cap_exceeded_throttled("seen_urls");
570                            break;
571                        }
572                        guard.insert(url.clone());
573                    }
574                }
575                Err(poisoned) => {
576                    let mut guard = poisoned.into_inner();
577                    for url in urls {
578                        if guard.contains(url) {
579                            continue;
580                        }
581                        if guard.len() >= self.config.crl_max_seen_urls {
582                            self.warn_cap_exceeded_throttled("seen_urls");
583                            break;
584                        }
585                        guard.insert(url.clone());
586                    }
587                }
588            }
589        }
590        missing_cached
591    }
592
593    /// Test-only: report whether a URL has been promoted to the
594    /// permanent dedup set. Used by the B2 retriability regression
595    /// test to assert that rate-limited URLs are NOT marked seen.
596    /// Not part of the public API.
597    #[doc(hidden)]
598    pub fn __test_is_seen(&self, url: &str) -> bool {
599        match self.seen_urls.lock() {
600            Ok(seen) => seen.contains(url),
601            Err(_) => false,
602        }
603    }
604
605    /// Test-only: current count of host semaphores. Used by
606    /// `tests/crl_map_bounds.rs` to assert the cap is enforced.
607    #[cfg(any(test, feature = "test-helpers"))]
608    #[doc(hidden)]
609    pub fn __test_host_semaphore_count(&self) -> usize {
610        self.host_semaphores
611            .try_lock()
612            .map_or(0, |guard| guard.len())
613    }
614
615    /// Test-only: current number of entries in the CRL cache.
616    #[cfg(any(test, feature = "test-helpers"))]
617    #[doc(hidden)]
618    pub fn __test_cache_len(&self) -> usize {
619        self.cache.try_read().map_or(0, |guard| guard.len())
620    }
621
622    /// Test-only: whether a specific URL is currently cached.
623    #[cfg(any(test, feature = "test-helpers"))]
624    #[doc(hidden)]
625    pub fn __test_cache_contains(&self, url: &str) -> bool {
626        self.cache
627            .try_read()
628            .is_ok_and(|guard| guard.contains_key(url))
629    }
630
631    /// Test-only: triggers the request-hot-path fetch path for `url`
632    /// WITHOUT going through the TLS handshake. Returns any error the
633    /// host-semaphore cap check produces. A network-unreachable
634    /// failure for the fetch itself is treated as `Ok(())` (test only
635    /// cares about the cap; real tests use mock hosts that won't
636    /// resolve — the cap must fire BEFORE network I/O).
637    #[cfg(any(test, feature = "test-helpers"))]
638    #[doc(hidden)]
639    pub async fn __test_trigger_fetch(&self, url: &str) -> Result<(), McpxError> {
640        if let Err(error) = gated_fetch(
641            &self.client,
642            &self.global_fetch_sem,
643            &self.host_semaphores,
644            url,
645            self.config.crl_allow_http,
646            self.max_response_bytes,
647            self.config.crl_max_host_semaphores,
648        )
649        .await
650        {
651            if error
652                .to_string()
653                .contains("crl_host_semaphore_cap_exceeded")
654            {
655                Err(error)
656            } else {
657                Ok(())
658            }
659        } else {
660            Ok(())
661        }
662    }
663
664    /// Test-only: directly insert `cached` under `url` into both
665    /// `cache` and `cached_urls`, bypassing HTTP. Does NOT enforce
666    /// `crl_max_cache_entries` when called pre-cap — the test uses it
667    /// to stage preconditions. For cap-breach coverage, tests invoke
668    /// the real production insertion path.
669    ///
670    /// Wait — the `cache_hard_cap_drops_newest` test DOES use this
671    /// helper to assert the cap fires. Therefore this helper MUST
672    /// enforce the hard cap (silent drop with warn!) the same way the
673    /// production code does. The helper is a thin wrapper around the
674    /// same internal insertion fn the production path uses.
675    #[cfg(any(test, feature = "test-helpers"))]
676    #[doc(hidden)]
677    pub async fn __test_insert_cache(&self, url: &str, cached: CachedCrl) {
678        let _ = self.insert_cache_entry(url.to_owned(), cached).await;
679    }
680
681    /// Test-only: trigger a refresh cycle for a single URL. Exercises
682    /// the same stale-grace / fetch-failure path as `refresh_urls()`.
683    /// Returns the refresh error (if any) — most tests ignore it
684    /// because they assert post-state, not the transient error.
685    #[cfg(any(test, feature = "test-helpers"))]
686    #[doc(hidden)]
687    pub async fn __test_trigger_refresh_url(&self, url: &str) -> Result<(), McpxError> {
688        self.refresh_urls(vec![url.to_owned()]).await
689    }
690
691    async fn fetch_url_results(
692        &self,
693        urls: Vec<String>,
694    ) -> Vec<(String, Result<CachedCrl, McpxError>)> {
695        let mut tasks = JoinSet::new();
696        for url in urls {
697            let client = self.client.clone();
698            let global_sem = Arc::clone(&self.global_fetch_sem);
699            let host_map = Arc::clone(&self.host_semaphores);
700            let allow_http = self.config.crl_allow_http;
701            let max_bytes = self.max_response_bytes;
702            let max_host_semaphores = self.config.crl_max_host_semaphores;
703            tasks.spawn(async move {
704                let result = gated_fetch(
705                    &client,
706                    &global_sem,
707                    &host_map,
708                    &url,
709                    allow_http,
710                    max_bytes,
711                    max_host_semaphores,
712                )
713                .await;
714                (url, result)
715            });
716        }
717
718        let mut results = Vec::new();
719        while let Some(joined) = tasks.join_next().await {
720            match joined {
721                Ok(result) => results.push(result),
722                Err(error) => {
723                    tracing::warn!(error = %error, "CRL refresh task join failed");
724                }
725            }
726        }
727
728        results
729    }
730
731    fn swap_verifier_from_cache(
732        &self,
733        cache: &impl std::ops::Deref<Target = HashMap<String, CachedCrl>>,
734    ) -> Result<(), McpxError> {
735        let verifier = rebuild_verifier(&self.roots, &self.config, cache)?;
736        self.inner_verifier
737            .store(Arc::new(VerifierHandle(verifier)));
738        Ok(())
739    }
740}
741
742impl CachedCrl {
743    /// Test-only: synthesize a cache entry that looks valid, `next_update`
744    /// = now + 24h. Fields used only to populate the HashMap — the bytes
745    /// are a minimal CRL-shape that won't be parsed by tests.
746    #[cfg(any(test, feature = "test-helpers"))]
747    #[doc(hidden)]
748    #[must_use]
749    pub fn __test_synthetic(now: SystemTime) -> Self {
750        Self {
751            der: CertificateRevocationListDer::from(vec![0x30, 0x00]),
752            this_update: now,
753            next_update: now.checked_add(Duration::from_hours(24)),
754            fetched_at: now,
755            source_url: "test://synthetic".to_owned(),
756        }
757    }
758
759    /// Test-only: synthesize a STALE cache entry (`next_update` in the
760    /// deep past so `is_stale_beyond_grace` fires with any sensible
761    /// `crl_stale_grace`).
762    #[cfg(any(test, feature = "test-helpers"))]
763    #[doc(hidden)]
764    #[must_use]
765    pub fn __test_stale(reference_past: SystemTime) -> Self {
766        Self {
767            der: CertificateRevocationListDer::from(vec![0x30, 0x00]),
768            this_update: reference_past,
769            next_update: Some(reference_past),
770            fetched_at: reference_past,
771            source_url: "test://stale".to_owned(),
772        }
773    }
774}
775
776/// Stable outer verifier that delegates all TLS verification behavior to the
777/// atomically swappable inner verifier.
778pub struct DynamicClientCertVerifier {
779    inner: Arc<CrlSet>,
780    dn_subjects: Vec<DistinguishedName>,
781}
782
783impl DynamicClientCertVerifier {
784    /// Construct a new dynamic verifier from a shared [`CrlSet`].
785    #[must_use]
786    pub fn new(inner: Arc<CrlSet>) -> Self {
787        Self {
788            dn_subjects: inner.roots.subjects(),
789            inner,
790        }
791    }
792}
793
794impl std::fmt::Debug for DynamicClientCertVerifier {
795    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
796        f.debug_struct("DynamicClientCertVerifier")
797            .field("dn_subjects_len", &self.dn_subjects.len())
798            .finish_non_exhaustive()
799    }
800}
801
802impl ClientCertVerifier for DynamicClientCertVerifier {
803    fn offer_client_auth(&self) -> bool {
804        let verifier = self.inner.inner_verifier.load();
805        verifier.0.offer_client_auth()
806    }
807
808    fn client_auth_mandatory(&self) -> bool {
809        let verifier = self.inner.inner_verifier.load();
810        verifier.0.client_auth_mandatory()
811    }
812
813    fn root_hint_subjects(&self) -> &[DistinguishedName] {
814        &self.dn_subjects
815    }
816
817    fn verify_client_cert(
818        &self,
819        end_entity: &CertificateDer<'_>,
820        intermediates: &[CertificateDer<'_>],
821        now: UnixTime,
822    ) -> Result<ClientCertVerified, TlsError> {
823        // SECURITY: extracting CDP URLs from an unverified client cert
824        // here is intentional. No HTTP happens on this path -- the call
825        // to `note_discovered_urls` only enqueues onto a bounded,
826        // rate-limited channel. The actual fetch runs off-path in
827        // `run_crl_refresher` and is gated by SSRF screening
828        // (`src/ssrf.rs`), body-size cap, deadline, and the
829        // `crl_allow_http` policy. CRLs are CA-signed (RFC 5280 §5), so
830        // http(s) CDP URLs are protocol design, not an SSRF sink. The
831        // discovery must happen BEFORE delegating to the inner verifier
832        // so `crl_deny_on_unavailable = true` can fail-closed on a
833        // never-fetched CDP. Do NOT reorder.
834        let mut discovered =
835            extract_cdp_urls(end_entity.as_ref(), self.inner.config.crl_allow_http);
836        for intermediate in intermediates {
837            discovered.extend(extract_cdp_urls(
838                intermediate.as_ref(),
839                self.inner.config.crl_allow_http,
840            ));
841        }
842        discovered.sort();
843        discovered.dedup();
844
845        if self.inner.note_discovered_urls(&discovered) {
846            return Err(TlsError::General(
847                "client certificate revocation status unavailable".to_owned(),
848            ));
849        }
850
851        let verifier = self.inner.inner_verifier.load();
852        verifier
853            .0
854            .verify_client_cert(end_entity, intermediates, now)
855    }
856
857    fn verify_tls12_signature(
858        &self,
859        message: &[u8],
860        cert: &CertificateDer<'_>,
861        dss: &DigitallySignedStruct,
862    ) -> Result<HandshakeSignatureValid, TlsError> {
863        let verifier = self.inner.inner_verifier.load();
864        verifier.0.verify_tls12_signature(message, cert, dss)
865    }
866
867    fn verify_tls13_signature(
868        &self,
869        message: &[u8],
870        cert: &CertificateDer<'_>,
871        dss: &DigitallySignedStruct,
872    ) -> Result<HandshakeSignatureValid, TlsError> {
873        let verifier = self.inner.inner_verifier.load();
874        verifier.0.verify_tls13_signature(message, cert, dss)
875    }
876
877    fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
878        let verifier = self.inner.inner_verifier.load();
879        verifier.0.supported_verify_schemes()
880    }
881
882    fn requires_raw_public_keys(&self) -> bool {
883        let verifier = self.inner.inner_verifier.load();
884        verifier.0.requires_raw_public_keys()
885    }
886}
887
888/// Extract CRL Distribution Point URLs from a DER-encoded certificate.
889///
890/// URLs are validated with `url::Url::parse` (case-insensitive scheme handling)
891/// and filtered through an internal scheme guard. Malformed URLs, URLs
892/// using disallowed schemes, and URLs carrying embedded credentials
893/// (userinfo) are silently dropped. SSRF defenses against private
894/// IP literals and metadata endpoints are applied later, at fetch time, after
895/// DNS resolution.
896#[must_use]
897pub fn extract_cdp_urls(cert_der: &[u8], allow_http: bool) -> Vec<String> {
898    let Ok((_, cert)) = X509Certificate::from_der(cert_der) else {
899        return Vec::new();
900    };
901
902    let mut urls = Vec::new();
903    for ext in cert.extensions() {
904        if let ParsedExtension::CRLDistributionPoints(cdps) = ext.parsed_extension() {
905            for point in cdps.iter() {
906                if let Some(DistributionPointName::FullName(names)) = &point.distribution_point {
907                    for name in names {
908                        if let GeneralName::URI(uri) = name {
909                            let raw = *uri;
910                            let Ok(parsed) = Url::parse(raw) else {
911                                // `?raw` (Debug) escapes control characters the
912                                // failed parse may have left in this
913                                // attacker-supplied string.
914                                tracing::debug!(url = ?raw, "CDP URL parse failed; dropped");
915                                continue;
916                            };
917                            if let Err(reason) = check_scheme(&parsed, allow_http) {
918                                tracing::debug!(
919                                    url = %sanitized_url_for_log(&parsed),
920                                    reason,
921                                    "CDP URL rejected by scheme guard; dropped"
922                                );
923                                continue;
924                            }
925                            urls.push(parsed.into());
926                        }
927                    }
928                }
929            }
930        }
931    }
932
933    urls
934}
935
936/// Bootstrap the CRL cache by extracting CDP URLs from the CA chain and
937/// fetching any reachable CRLs with a 10-second total deadline.
938///
939/// # Errors
940///
941/// Returns an error if the initial verifier cannot be built.
942#[allow(
943    clippy::cognitive_complexity,
944    reason = "bootstrap coordinates timeout, parallel fetches, and partial-cache recovery"
945)]
946pub async fn bootstrap_fetch(
947    roots: Arc<RootCertStore>,
948    ca_certs: &[CertificateDer<'static>],
949    config: MtlsConfig,
950) -> Result<(Arc<CrlSet>, mpsc::UnboundedReceiver<String>), McpxError> {
951    let (discover_tx, discover_rx) = mpsc::unbounded_channel();
952
953    let mut urls = ca_certs
954        .iter()
955        .flat_map(|cert| extract_cdp_urls(cert.as_ref(), config.crl_allow_http))
956        .collect::<Vec<_>>();
957    urls.sort();
958    urls.dedup();
959
960    // M-H2: same SSRF resolver hardening as CrlSet::new -- bootstrap
961    // fetches the same attacker-controlled CDP URLs, just earlier in
962    // the lifecycle.
963    let bootstrap_allowlist = Arc::new(crate::ssrf::CompiledSsrfAllowlist::default());
964    let bootstrap_resolver: Arc<dyn reqwest::dns::Resolve> =
965        Arc::new(crate::ssrf_resolver::SsrfScreeningResolver::new(
966            Arc::clone(&bootstrap_allowlist),
967            #[cfg(any(test, feature = "test-helpers"))]
968            Arc::new(std::sync::atomic::AtomicBool::new(false)),
969            #[cfg(not(any(test, feature = "test-helpers")))]
970            (),
971        ));
972
973    let client = reqwest::Client::builder()
974        // M-H2/N1: see oauth.rs::OauthHttpClient::build for rationale.
975        .no_proxy()
976        .dns_resolver(Arc::clone(&bootstrap_resolver))
977        .timeout(config.crl_fetch_timeout)
978        .connect_timeout(CRL_CONNECT_TIMEOUT)
979        .tcp_keepalive(None)
980        .redirect(reqwest::redirect::Policy::none())
981        .user_agent(format!("rmcp-server-kit/{}", env!("CARGO_PKG_VERSION")))
982        .build()
983        .map_err(|error| McpxError::Startup(format!("CRL HTTP client init: {error}")))?;
984
985    // Bootstrap shares the same global concurrency + per-host cap as the
986    // hot-path verifier so a maliciously broad CA chain cannot overwhelm
987    // the network at startup.
988    let bootstrap_concurrency = config.crl_max_concurrent_fetches.max(1);
989    let global_sem = Arc::new(Semaphore::new(bootstrap_concurrency));
990    let host_semaphores = Arc::new(tokio::sync::Mutex::new(HashMap::new()));
991    let allow_http = config.crl_allow_http;
992    let max_bytes = config.crl_max_response_bytes;
993    let max_host_semaphores = config.crl_max_host_semaphores;
994
995    let mut initial_cache = HashMap::new();
996    let mut tasks = JoinSet::new();
997    for url in &urls {
998        let client = client.clone();
999        let url = url.clone();
1000        let global_sem = Arc::clone(&global_sem);
1001        let host_semaphores = Arc::clone(&host_semaphores);
1002        tasks.spawn(async move {
1003            let result = gated_fetch(
1004                &client,
1005                &global_sem,
1006                &host_semaphores,
1007                &url,
1008                allow_http,
1009                max_bytes,
1010                max_host_semaphores,
1011            )
1012            .await;
1013            (url, result)
1014        });
1015    }
1016
1017    let timeout: Sleep = tokio::time::sleep(BOOTSTRAP_TIMEOUT);
1018    tokio::pin!(timeout);
1019
1020    while !tasks.is_empty() {
1021        // cancel-safe: pinned Sleep and JoinSet::join_next are cancel-safe
1022        // (tokio docs); on timeout the loop breaks and dropping the JoinSet
1023        // aborts remaining fetches — the intended deadline behavior.
1024        tokio::select! {
1025            () = &mut timeout => {
1026                tracing::warn!("CRL bootstrap timed out after {:?}", BOOTSTRAP_TIMEOUT);
1027                break;
1028            }
1029            maybe_joined = tasks.join_next() => {
1030                let Some(joined) = maybe_joined else {
1031                    break;
1032                };
1033                match joined {
1034                    Ok((url, Ok(cached))) => {
1035                        initial_cache.insert(url, cached);
1036                    }
1037                    Ok((url, Err(error))) => {
1038                        tracing::warn!(url = %url, error = %error, "CRL bootstrap fetch failed");
1039                    }
1040                    Err(error) => {
1041                        tracing::warn!(error = %error, "CRL bootstrap task join failed");
1042                    }
1043                }
1044            }
1045        }
1046    }
1047
1048    let set = CrlSet::new(roots, config, discover_tx, initial_cache)?;
1049    Ok((set, discover_rx))
1050}
1051
1052/// Run the CRL refresher loop until shutdown.
1053#[allow(
1054    clippy::cognitive_complexity,
1055    reason = "refresher loop intentionally handles shutdown, timer, and discovery in one select"
1056)]
1057pub async fn run_crl_refresher(
1058    set: Arc<CrlSet>,
1059    mut discover_rx: mpsc::UnboundedReceiver<String>,
1060    shutdown: CancellationToken,
1061) {
1062    let mut refresh_sleep = schedule_next_refresh(&set).await;
1063
1064    loop {
1065        // cancel-safe: CancellationToken::cancelled, pinned &mut Sleep, and
1066        // mpsc::UnboundedReceiver::recv are all cancel-safe (tokio docs);
1067        // refresh work happens inside arm bodies, never in the raced futures.
1068        tokio::select! {
1069            () = shutdown.cancelled() => {
1070                break;
1071            }
1072            () = &mut refresh_sleep => {
1073                if let Err(error) = set.refresh_due_urls().await {
1074                    tracing::warn!(error = %error, "CRL periodic refresh failed");
1075                }
1076                refresh_sleep = schedule_next_refresh(&set).await;
1077            }
1078            maybe_url = discover_rx.recv() => {
1079                let Some(url) = maybe_url else {
1080                    break;
1081                };
1082                if let Err(error) = set.fetch_and_store_url(url.clone()).await {
1083                    tracing::warn!(url = %url, error = %error, "CRL discovery fetch failed");
1084                }
1085                refresh_sleep = schedule_next_refresh(&set).await;
1086            }
1087        }
1088    }
1089}
1090
1091/// Rebuild the inner rustls verifier from the current CRL cache.
1092///
1093/// # Errors
1094///
1095/// Returns an error if rustls rejects the verifier configuration.
1096pub fn rebuild_verifier<S: std::hash::BuildHasher>(
1097    roots: &Arc<RootCertStore>,
1098    config: &MtlsConfig,
1099    cache: &HashMap<String, CachedCrl, S>,
1100) -> Result<Arc<dyn ClientCertVerifier>, McpxError> {
1101    let mut builder = WebPkiClientVerifier::builder(Arc::clone(roots));
1102
1103    if !cache.is_empty() {
1104        let crls = cache
1105            .values()
1106            .map(|cached| cached.der.clone())
1107            .collect::<Vec<_>>();
1108        builder = builder.with_crls(crls);
1109    }
1110    if config.crl_end_entity_only {
1111        builder = builder.only_check_end_entity_revocation();
1112    }
1113    if !config.crl_deny_on_unavailable {
1114        builder = builder.allow_unknown_revocation_status();
1115    }
1116    if config.crl_enforce_expiration {
1117        builder = builder.enforce_revocation_expiration();
1118    }
1119    if !config.required {
1120        builder = builder.allow_unauthenticated();
1121    }
1122
1123    builder
1124        .build()
1125        .map_err(|error| McpxError::Tls(format!("mTLS verifier error: {error}")))
1126}
1127
1128/// Parse `thisUpdate` and `nextUpdate` metadata from a DER-encoded CRL.
1129///
1130/// # Errors
1131///
1132/// Returns an error if the CRL cannot be parsed.
1133pub fn parse_crl_metadata(der: &[u8]) -> Result<(SystemTime, Option<SystemTime>), McpxError> {
1134    let (_, crl) = CertificateRevocationList::from_der(der)
1135        .map_err(|error| McpxError::Tls(format!("invalid CRL DER: {error:?}")))?;
1136
1137    Ok((
1138        asn1_time_to_system_time(crl.last_update()),
1139        crl.next_update().map(asn1_time_to_system_time),
1140    ))
1141}
1142
1143async fn schedule_next_refresh(set: &CrlSet) -> Pin<Box<Sleep>> {
1144    let duration = next_refresh_delay(set).await;
1145    boxed_sleep(duration)
1146}
1147
1148fn boxed_sleep(duration: Duration) -> Pin<Box<Sleep>> {
1149    Box::pin(tokio::time::sleep_until(Instant::now() + duration))
1150}
1151
1152async fn next_refresh_delay(set: &CrlSet) -> Duration {
1153    if let Some(interval) = set.config.crl_refresh_interval {
1154        return clamp_refresh(interval);
1155    }
1156
1157    let now = SystemTime::now();
1158    let cache = set.cache.read().await;
1159    let mut next = MAX_AUTO_REFRESH;
1160
1161    for cached in cache.values() {
1162        if let Some(next_update) = cached.next_update {
1163            let duration = next_update.duration_since(now).unwrap_or(Duration::ZERO);
1164            next = next.min(clamp_refresh(duration));
1165        }
1166    }
1167    drop(cache);
1168
1169    next
1170}
1171
1172/// Get-or-insert the per-host fetch semaphore for `host_key`.
1173///
1174/// When the map is at `max_host_semaphores`, idle entries (no in-flight
1175/// fetch) are evicted before rejecting, so the cap only fails when `max`
1176/// distinct hosts are *concurrently* fetching — it is never a permanent
1177/// lockout. Every clone of a host semaphore is created while holding the
1178/// map lock, and a clone outlives the critical section only while a fetch
1179/// is in flight, so an entry with `Arc::strong_count == 1` is provably
1180/// idle and safe to drop.
1181fn acquire_host_semaphore(
1182    map: &mut HashMap<String, Arc<Semaphore>>,
1183    host_key: &str,
1184    max_host_semaphores: usize,
1185) -> Result<Arc<Semaphore>, McpxError> {
1186    if !map.contains_key(host_key) {
1187        if map.len() >= max_host_semaphores {
1188            // Self-heal: drop semaphores with no in-flight fetch.
1189            map.retain(|_, semaphore| Arc::strong_count(semaphore) > 1);
1190        }
1191        if map.len() >= max_host_semaphores {
1192            return Err(McpxError::Config(
1193                "crl_host_semaphore_cap_exceeded: too many distinct CRL hosts in flight".to_owned(),
1194            ));
1195        }
1196        map.insert(host_key.to_owned(), Arc::new(Semaphore::new(1)));
1197    }
1198    match map.get(host_key) {
1199        Some(semaphore) => Ok(Arc::clone(semaphore)),
1200        None => Err(McpxError::Tls(
1201            "CRL host semaphore missing after insertion".to_owned(),
1202        )),
1203    }
1204}
1205
1206/// Fetch a single CRL URL through the global + per-host concurrency caps.
1207///
1208/// `global_sem` caps total simultaneous CRL fetches process-wide.
1209/// `host_semaphores` ensures at most one in-flight fetch per origin host
1210/// (an SSRF amplification defense); at the host cap, idle entries are
1211/// evicted on demand. Both permits are dropped when the returned future
1212/// completes (whether `Ok` or `Err`).
1213async fn gated_fetch(
1214    client: &reqwest::Client,
1215    global_sem: &Arc<Semaphore>,
1216    host_semaphores: &Arc<tokio::sync::Mutex<HashMap<String, Arc<Semaphore>>>>,
1217    url: &str,
1218    allow_http: bool,
1219    max_bytes: u64,
1220    max_host_semaphores: usize,
1221) -> Result<CachedCrl, McpxError> {
1222    let host_key = Url::parse(url)
1223        .ok()
1224        .and_then(|u| u.host_str().map(str::to_owned))
1225        .unwrap_or_else(|| url.to_owned());
1226
1227    let host_sem = {
1228        let mut map = host_semaphores.lock().await;
1229        acquire_host_semaphore(&mut map, &host_key, max_host_semaphores)?
1230    };
1231
1232    let _global_permit = Arc::clone(global_sem)
1233        .acquire_owned()
1234        .await
1235        .map_err(|error| McpxError::Tls(format!("CRL global semaphore closed: {error}")))?;
1236    let _host_permit = host_sem
1237        .acquire_owned()
1238        .await
1239        .map_err(|error| McpxError::Tls(format!("CRL host semaphore closed: {error}")))?;
1240
1241    fetch_crl(client, url, allow_http, max_bytes).await
1242}
1243
1244async fn fetch_crl(
1245    client: &reqwest::Client,
1246    url: &str,
1247    allow_http: bool,
1248    max_bytes: u64,
1249) -> Result<CachedCrl, McpxError> {
1250    let parsed =
1251        Url::parse(url).map_err(|error| McpxError::Tls(format!("CRL URL parse {url}: {error}")))?;
1252
1253    if let Err(reason) = check_scheme(&parsed, allow_http) {
1254        // Sanitized: the gate must not echo what it rejects (the URL may
1255        // carry userinfo credentials — the very thing being refused).
1256        let sanitized = sanitized_url_for_log(&parsed);
1257        tracing::warn!(url = %sanitized, reason, "CRL fetch denied: scheme");
1258        return Err(McpxError::Tls(format!(
1259            "CRL scheme rejected ({reason}): {sanitized}"
1260        )));
1261    }
1262
1263    let host = parsed
1264        .host_str()
1265        .ok_or_else(|| McpxError::Tls(format!("CRL URL has no host: {url}")))?;
1266    let port = parsed
1267        .port_or_known_default()
1268        .ok_or_else(|| McpxError::Tls(format!("CRL URL has no known port: {url}")))?;
1269
1270    let addrs = lookup_host((host, port))
1271        .await
1272        .map_err(|error| McpxError::Tls(format!("CRL DNS resolution {url}: {error}")))?;
1273
1274    let mut any_addr = false;
1275    for addr in addrs {
1276        any_addr = true;
1277        if let Some(reason) = ip_block_reason(addr.ip()) {
1278            tracing::warn!(
1279                url = %url,
1280                resolved_ip = %addr.ip(),
1281                reason,
1282                "CRL fetch denied: blocked IP"
1283            );
1284            return Err(McpxError::Tls(format!(
1285                "CRL host resolved to blocked IP ({reason}): {url}"
1286            )));
1287        }
1288    }
1289    if !any_addr {
1290        return Err(McpxError::Tls(format!(
1291            "CRL DNS resolution returned no addresses: {url}"
1292        )));
1293    }
1294
1295    let mut response = client
1296        .get(url)
1297        .send()
1298        .await
1299        .map_err(|error| McpxError::Tls(format!("CRL fetch {url}: {error}")))?
1300        .error_for_status()
1301        .map_err(|error| McpxError::Tls(format!("CRL fetch {url}: {error}")))?;
1302
1303    // Enforce body cap by streaming chunk-by-chunk; a malicious or
1304    // misconfigured server cannot allocate more than `max_bytes` of memory.
1305    let initial_capacity = usize::try_from(max_bytes.min(64 * 1024)).unwrap_or(64 * 1024);
1306    let mut body: Vec<u8> = Vec::with_capacity(initial_capacity);
1307    while let Some(chunk) = response
1308        .chunk()
1309        .await
1310        .map_err(|error| McpxError::Tls(format!("CRL read {url}: {error}")))?
1311    {
1312        let chunk_len = u64::try_from(chunk.len()).unwrap_or(u64::MAX);
1313        let body_len = u64::try_from(body.len()).unwrap_or(u64::MAX);
1314        if body_len.saturating_add(chunk_len) > max_bytes {
1315            return Err(McpxError::Tls(format!(
1316                "CRL body exceeded cap of {max_bytes} bytes: {url}"
1317            )));
1318        }
1319        body.extend_from_slice(&chunk);
1320    }
1321
1322    let der = CertificateRevocationListDer::from(body);
1323    let (this_update, next_update) = parse_crl_metadata(der.as_ref())?;
1324
1325    Ok(CachedCrl {
1326        der,
1327        this_update,
1328        next_update,
1329        fetched_at: SystemTime::now(),
1330        source_url: url.to_owned(),
1331    })
1332}
1333
1334fn should_refresh_cached(
1335    cached: &CachedCrl,
1336    now: SystemTime,
1337    fixed_interval: Option<Duration>,
1338) -> bool {
1339    if let Some(interval) = fixed_interval {
1340        return cached
1341            .fetched_at
1342            .checked_add(clamp_refresh(interval))
1343            .is_none_or(|deadline| now >= deadline);
1344    }
1345
1346    cached
1347        .next_update
1348        .is_none_or(|next_update| now >= next_update)
1349}
1350
1351fn clamp_refresh(duration: Duration) -> Duration {
1352    duration.clamp(MIN_AUTO_REFRESH, MAX_AUTO_REFRESH)
1353}
1354
1355/// 9999-12-31T23:59:59Z — the maximum instant expressible as an ASN.1
1356/// GeneralizedTime (four-digit year). Used to clamp absurd positive
1357/// timestamps before converting to [`SystemTime`].
1358const MAX_ASN1_TIMESTAMP_SECS: u64 = 253_402_300_799;
1359
1360/// Convert an ASN.1 time to [`SystemTime`] without ever panicking.
1361///
1362/// CRL metadata is parsed from raw fetched bytes *before* signature
1363/// validation, so timestamps are attacker-controlled. Platform
1364/// `SystemTime` ranges differ (Windows cannot represent pre-1601);
1365/// unrepresentable values are clamped toward [`UNIX_EPOCH`], which is the
1366/// safe direction: it can only make a CRL look *older* (forcing an
1367/// eager refresh), never fresher.
1368fn asn1_time_to_system_time(time: x509_parser::time::ASN1Time) -> SystemTime {
1369    let timestamp = time.timestamp();
1370    if timestamp >= 0 {
1371        let seconds = u64::try_from(timestamp)
1372            .unwrap_or(0)
1373            .min(MAX_ASN1_TIMESTAMP_SECS);
1374        UNIX_EPOCH
1375            .checked_add(Duration::from_secs(seconds))
1376            .unwrap_or(UNIX_EPOCH)
1377    } else {
1378        UNIX_EPOCH
1379            .checked_sub(Duration::from_secs(timestamp.unsigned_abs()))
1380            .unwrap_or(UNIX_EPOCH)
1381    }
1382}
1383
1384#[cfg(test)]
1385mod tests {
1386    use super::*;
1387
1388    fn asn1(timestamp: i64) -> x509_parser::time::ASN1Time {
1389        x509_parser::time::ASN1Time::from_timestamp(timestamp).expect("valid ASN.1 timestamp")
1390    }
1391
1392    /// The userinfo gate fires before DNS resolution (no network needed)
1393    /// and the surfaced error must not echo the rejected credentials.
1394    #[tokio::test]
1395    async fn fetch_crl_rejects_userinfo_without_echoing_credentials() {
1396        // reqwest with `rustls-no-provider` requires a process-wide crypto
1397        // provider before any Client is built (same pattern as the
1398        // transport/oauth test suites).
1399        let _ = rustls::crypto::ring::default_provider().install_default();
1400        let client = reqwest::Client::new();
1401        let err = fetch_crl(&client, "https://u:p@crl.example/ca.crl", false, 1024)
1402            .await
1403            .expect_err("userinfo-bearing CRL URL must be rejected");
1404        let rendered = err.to_string();
1405        assert!(
1406            rendered.contains("userinfo_forbidden"),
1407            "error must carry the rejection reason: {rendered}"
1408        );
1409        assert!(
1410            !rendered.contains("u:p"),
1411            "error must not echo the rejected credentials: {rendered}"
1412        );
1413    }
1414
1415    /// `extract_cdp_urls`'s scheme/userinfo guard reuses the same gate;
1416    /// the sanitizer keeps credentials out of its debug logging too.
1417    #[test]
1418    fn sanitizer_used_by_rejection_sites_strips_credentials() {
1419        let parsed = Url::parse("https://u:p@crl.example/ca.crl").expect("parse");
1420        let sanitized = sanitized_url_for_log(&parsed);
1421        assert_eq!(sanitized, "https://crl.example");
1422        assert!(!sanitized.contains("u:p"));
1423    }
1424
1425    #[test]
1426    fn asn1_time_clamps_unrepresentable_timestamps() {
1427        // Year 1500 — pre-1601, NOT representable by Windows `SystemTime`.
1428        // Pre-fix this panicked on Windows; now it must return a value no
1429        // later than the epoch on every platform (clamped to UNIX_EPOCH on
1430        // Windows, the real instant on platforms that can represent it).
1431        let year_1500 = asn1_time_to_system_time(asn1(-14_831_769_600));
1432        assert!(year_1500 <= UNIX_EPOCH);
1433        #[cfg(windows)]
1434        assert_eq!(year_1500, UNIX_EPOCH);
1435
1436        // 1601-01-01T00:00:00Z — the exact Windows epoch boundary, which IS
1437        // representable everywhere. No clamp, no panic.
1438        let year_1601 = asn1_time_to_system_time(asn1(-11_644_473_600));
1439        assert!(year_1601 <= UNIX_EPOCH);
1440
1441        // Mildly negative (pre-1970) stays at-or-before the epoch.
1442        assert!(asn1_time_to_system_time(asn1(-2)) <= UNIX_EPOCH);
1443
1444        // Normal positive timestamps round-trip exactly.
1445        assert_eq!(
1446            asn1_time_to_system_time(asn1(1_700_000_000)),
1447            UNIX_EPOCH + Duration::from_secs(1_700_000_000)
1448        );
1449
1450        // The ASN.1 maximum (9999-12-31) is representable and preserved.
1451        let max = i64::try_from(MAX_ASN1_TIMESTAMP_SECS).expect("fits in i64");
1452        assert_eq!(
1453            asn1_time_to_system_time(asn1(max)),
1454            UNIX_EPOCH + Duration::from_secs(MAX_ASN1_TIMESTAMP_SECS)
1455        );
1456    }
1457
1458    #[test]
1459    fn host_semaphore_evicts_idle_at_cap() {
1460        let mut map = HashMap::new();
1461        for i in 0..4 {
1462            // Dropped immediately: only the map holds each semaphore (idle).
1463            drop(
1464                acquire_host_semaphore(&mut map, &format!("idle-{i}.example"), 4)
1465                    .expect("under cap"),
1466            );
1467        }
1468        assert_eq!(map.len(), 4);
1469
1470        // At the cap, a NEW host must succeed by evicting idle entries —
1471        // the cap error is not sticky.
1472        let sem = acquire_host_semaphore(&mut map, "new-host.example", 4)
1473            .expect("idle eviction frees space for a new host");
1474        assert!(map.contains_key("new-host.example"));
1475        drop(sem);
1476    }
1477
1478    #[test]
1479    fn host_semaphore_keeps_inflight_at_cap() {
1480        let mut map = HashMap::new();
1481        // Held across the cap check: simulates an in-flight fetch.
1482        let inflight = acquire_host_semaphore(&mut map, "busy.example", 3).expect("under cap");
1483        for i in 0..2 {
1484            drop(
1485                acquire_host_semaphore(&mut map, &format!("idle-{i}.example"), 3)
1486                    .expect("under cap"),
1487            );
1488        }
1489        assert_eq!(map.len(), 3);
1490
1491        drop(
1492            acquire_host_semaphore(&mut map, "new-host.example", 3)
1493                .expect("idle entries evicted while in-flight survives"),
1494        );
1495        assert!(
1496            map.contains_key("busy.example"),
1497            "in-flight host must survive eviction"
1498        );
1499        assert!(map.contains_key("new-host.example"));
1500        drop(inflight);
1501    }
1502
1503    #[test]
1504    fn host_semaphore_cap_error_when_all_inflight() {
1505        let mut map = HashMap::new();
1506        let held: Vec<_> = (0..2)
1507            .map(|i| {
1508                acquire_host_semaphore(&mut map, &format!("busy-{i}.example"), 2)
1509                    .expect("under cap")
1510            })
1511            .collect();
1512
1513        let result = acquire_host_semaphore(&mut map, "new-host.example", 2);
1514        assert!(
1515            result.is_err(),
1516            "cap must still reject when every entry has an in-flight fetch"
1517        );
1518        drop(held);
1519    }
1520}