1use std::{
25 collections::{HashMap, HashSet},
26 num::NonZeroU32,
27 pin::Pin,
28 sync::{Arc, Mutex},
29 time::{Duration, SystemTime, UNIX_EPOCH},
30};
31
32use arc_swap::ArcSwap;
33use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
34use rustls::{
35 DigitallySignedStruct, DistinguishedName, Error as TlsError, RootCertStore, SignatureScheme,
36 client::danger::HandshakeSignatureValid,
37 pki_types::{CertificateDer, CertificateRevocationListDer, UnixTime},
38 server::{
39 WebPkiClientVerifier,
40 danger::{ClientCertVerified, ClientCertVerifier},
41 },
42};
43use tokio::{
44 net::lookup_host,
45 sync::{RwLock, Semaphore, mpsc},
46 task::JoinSet,
47 time::{Instant, Sleep},
48};
49use tokio_util::sync::CancellationToken;
50use url::Url;
51use x509_parser::{
52 extensions::{DistributionPointName, GeneralName, ParsedExtension},
53 prelude::{FromDer, X509Certificate},
54 revocation_list::CertificateRevocationList,
55};
56
57use crate::{
58 auth::MtlsConfig,
59 error::McpxError,
60 ssrf::{check_scheme, ip_block_reason, sanitized_url_for_log},
61};
62
63const BOOTSTRAP_TIMEOUT: Duration = Duration::from_secs(10);
64const MIN_AUTO_REFRESH: Duration = Duration::from_mins(10);
65const MAX_AUTO_REFRESH: Duration = Duration::from_hours(24);
66const CRL_CONNECT_TIMEOUT: Duration = Duration::from_secs(3);
69
70#[derive(Clone, Debug)]
72#[non_exhaustive]
73pub struct CachedCrl {
74 pub der: CertificateRevocationListDer<'static>,
76 pub this_update: SystemTime,
78 pub next_update: Option<SystemTime>,
80 pub fetched_at: SystemTime,
82 pub source_url: String,
84}
85
86pub(crate) struct VerifierHandle(pub Arc<dyn ClientCertVerifier>);
87
88impl std::fmt::Debug for VerifierHandle {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 f.debug_struct("VerifierHandle").finish_non_exhaustive()
91 }
92}
93
94#[allow(
96 missing_debug_implementations,
97 reason = "contains ArcSwap and dyn verifier internals"
98)]
99#[non_exhaustive]
100pub struct CrlSet {
101 inner_verifier: ArcSwap<VerifierHandle>,
102 pub cache: RwLock<HashMap<String, CachedCrl>>,
104 pub roots: Arc<RootCertStore>,
106 pub config: MtlsConfig,
108 pub discover_tx: mpsc::UnboundedSender<String>,
110 client: reqwest::Client,
111 seen_urls: Mutex<HashSet<String>>,
112 cached_urls: Mutex<HashSet<String>>,
113 global_fetch_sem: Arc<Semaphore>,
115 host_semaphores: Arc<tokio::sync::Mutex<HashMap<String, Arc<Semaphore>>>>,
120 discovery_limiter: Arc<DefaultDirectRateLimiter>,
131 max_response_bytes: u64,
134 last_cap_warn: Mutex<HashMap<&'static str, Instant>>,
135}
136
137impl CrlSet {
138 fn new(
139 roots: Arc<RootCertStore>,
140 config: MtlsConfig,
141 discover_tx: mpsc::UnboundedSender<String>,
142 initial_cache: HashMap<String, CachedCrl>,
143 ) -> Result<Arc<Self>, McpxError> {
144 let allowlist = Arc::new(crate::ssrf::CompiledSsrfAllowlist::default());
152 let resolver: Arc<dyn reqwest::dns::Resolve> =
153 Arc::new(crate::ssrf_resolver::SsrfScreeningResolver::new(
154 Arc::clone(&allowlist),
155 #[cfg(any(test, feature = "test-helpers"))]
156 Arc::new(std::sync::atomic::AtomicBool::new(false)),
157 #[cfg(not(any(test, feature = "test-helpers")))]
158 (),
159 ));
160
161 let client = reqwest::Client::builder()
162 .no_proxy()
164 .dns_resolver(Arc::clone(&resolver))
165 .timeout(config.crl_fetch_timeout)
166 .connect_timeout(CRL_CONNECT_TIMEOUT)
167 .tcp_keepalive(None)
168 .redirect(reqwest::redirect::Policy::none())
169 .user_agent(format!("rmcp-server-kit/{}", env!("CARGO_PKG_VERSION")))
170 .build()
171 .map_err(|error| McpxError::Startup(format!("CRL HTTP client init: {error}")))?;
172
173 let initial_verifier = rebuild_verifier(&roots, &config, &initial_cache)?;
174 let seen_urls = initial_cache.keys().cloned().collect::<HashSet<_>>();
175 let cached_urls = seen_urls.clone();
176
177 let concurrency = config.crl_max_concurrent_fetches.max(1);
178 let global_fetch_sem = Arc::new(Semaphore::new(concurrency));
179 let host_semaphores = Arc::new(tokio::sync::Mutex::new(HashMap::new()));
180
181 let rate =
182 NonZeroU32::new(config.crl_discovery_rate_per_min.max(1)).unwrap_or(NonZeroU32::MIN);
183 let discovery_limiter = Arc::new(RateLimiter::direct(Quota::per_minute(rate)));
184
185 let max_response_bytes = config.crl_max_response_bytes;
186
187 Ok(Arc::new(Self {
188 inner_verifier: ArcSwap::from_pointee(VerifierHandle(initial_verifier)),
189 cache: RwLock::new(initial_cache),
190 roots,
191 config,
192 discover_tx,
193 client,
194 seen_urls: Mutex::new(seen_urls),
195 cached_urls: Mutex::new(cached_urls),
196 global_fetch_sem,
197 host_semaphores,
198 discovery_limiter,
199 max_response_bytes,
200 last_cap_warn: Mutex::new(HashMap::new()),
201 }))
202 }
203
204 fn warn_cap_exceeded_throttled(&self, which: &'static str) {
205 let now = Instant::now();
206 let cooldown = Duration::from_mins(1);
207 let should_warn = match self.last_cap_warn.lock() {
208 Ok(mut guard) => {
209 let should_emit = guard
210 .get(which)
211 .is_none_or(|last| now.saturating_duration_since(*last) >= cooldown);
212 if should_emit {
213 guard.insert(which, now);
214 }
215 should_emit
216 }
217 Err(poisoned) => {
218 let mut guard = poisoned.into_inner();
219 let should_emit = guard
220 .get(which)
221 .is_none_or(|last| now.saturating_duration_since(*last) >= cooldown);
222 if should_emit {
223 guard.insert(which, now);
224 }
225 should_emit
226 }
227 };
228
229 if should_warn {
230 tracing::warn!(which = which, "CRL map cap exceeded; dropping newest entry");
231 }
232 }
233
234 async fn insert_cache_entry(&self, url: String, cached: CachedCrl) -> bool {
235 let inserted = {
242 let mut guard = self.cache.write().await;
243 if guard.len() >= self.config.crl_max_cache_entries && !guard.contains_key(&url) {
244 false
245 } else {
246 guard.insert(url.clone(), cached);
247 true
248 }
249 };
250
251 if inserted {
252 match self.cached_urls.lock() {
253 Ok(mut cached_urls) => {
254 cached_urls.insert(url);
255 }
256 Err(poisoned) => {
257 poisoned.into_inner().insert(url);
258 }
259 }
260 } else {
261 self.warn_cap_exceeded_throttled("cache");
262 }
263
264 inserted
265 }
266
267 pub async fn force_refresh(&self) -> Result<(), McpxError> {
273 let urls = {
274 let cache = self.cache.read().await;
275 cache.keys().cloned().collect::<Vec<_>>()
276 };
277 self.refresh_urls(urls).await
278 }
279
280 async fn refresh_due_urls(&self) -> Result<(), McpxError> {
281 let now = SystemTime::now();
282 let urls = {
283 let cache = self.cache.read().await;
284 cache
285 .iter()
286 .filter(|(_, cached)| {
287 should_refresh_cached(cached, now, self.config.crl_refresh_interval)
288 })
289 .map(|(url, _)| url.clone())
290 .collect::<Vec<_>>()
291 };
292
293 if urls.is_empty() {
294 return Ok(());
295 }
296
297 self.refresh_urls(urls).await
298 }
299
300 async fn refresh_urls(&self, urls: Vec<String>) -> Result<(), McpxError> {
301 let results = self.fetch_url_results(urls).await;
302 let now = SystemTime::now();
303 let mut cache = self.cache.write().await;
304 let mut changed = false;
305
306 for (url, result) in results {
307 match result {
308 Ok(cached) => {
309 if cache.len() >= self.config.crl_max_cache_entries && !cache.contains_key(&url)
310 {
311 drop(cache);
312 self.warn_cap_exceeded_throttled("cache");
313 cache = self.cache.write().await;
314 continue;
315 }
316 cache.insert(url.clone(), cached);
317 changed = true;
318 match self.cached_urls.lock() {
319 Ok(mut cached_urls) => {
320 cached_urls.insert(url);
321 }
322 Err(poisoned) => {
323 poisoned.into_inner().insert(url);
324 }
325 }
326 }
327 Err(error) => {
328 let remove_entry = cache.get(&url).is_some_and(|existing| {
329 existing
330 .next_update
331 .and_then(|next| next.checked_add(self.config.crl_stale_grace))
332 .is_some_and(|deadline| now > deadline)
333 });
334 tracing::warn!(url = %url, error = %error, "CRL refresh failed");
335 if remove_entry {
336 cache.remove(&url);
337 changed = true;
338 match self.cached_urls.lock() {
339 Ok(mut cached_urls) => {
340 cached_urls.remove(&url);
341 }
342 Err(poisoned) => {
343 poisoned.into_inner().remove(&url);
344 }
345 }
346 match self.seen_urls.lock() {
347 Ok(mut seen_urls) => {
348 seen_urls.remove(&url);
349 }
350 Err(poisoned) => {
351 poisoned.into_inner().remove(&url);
352 }
353 }
354 }
355 }
356 }
357 }
358
359 if changed {
360 self.swap_verifier_from_cache(&cache)?;
361 }
362 drop(cache);
363
364 Ok(())
365 }
366
367 async fn fetch_and_store_url(&self, url: String) -> Result<(), McpxError> {
368 let cached = gated_fetch(
369 &self.client,
370 &self.global_fetch_sem,
371 &self.host_semaphores,
372 &url,
373 self.config.crl_allow_http,
374 self.max_response_bytes,
375 self.config.crl_max_host_semaphores,
376 )
377 .await?;
378 if !self.insert_cache_entry(url, cached).await {
379 return Ok(());
380 }
381 let cache = self.cache.read().await;
382 self.swap_verifier_from_cache(&cache)?;
383 Ok(())
384 }
385
386 fn note_discovered_urls(&self, urls: &[String]) -> bool {
387 let mut missing_cached = false;
396
397 let candidates: Vec<String> = match self.seen_urls.lock() {
408 Ok(seen) => urls
409 .iter()
410 .filter(|url| !seen.contains(*url))
411 .cloned()
412 .collect(),
413 Err(_) => Vec::new(),
414 };
415
416 for url in candidates {
423 if self.discovery_limiter.check().is_err() {
424 tracing::warn!(
425 url = %url,
426 "discovery_rate_limited: dropped CDP URL beyond per-minute cap (will be retried on next handshake observing this URL)"
427 );
428 continue;
429 }
430 if self.discover_tx.send(url.clone()).is_err() {
431 tracing::debug!(
434 url = %url,
435 "discover channel closed; dropping CDP URL without marking seen"
436 );
437 continue;
438 }
439 let mut guard = self
441 .seen_urls
442 .lock()
443 .unwrap_or_else(std::sync::PoisonError::into_inner);
444 if guard.len() >= self.config.crl_max_seen_urls {
445 self.warn_cap_exceeded_throttled("seen_urls");
446 break;
447 }
448 guard.insert(url);
449 }
450
451 if self.config.crl_deny_on_unavailable {
452 let cached = self
453 .cached_urls
454 .lock()
455 .ok()
456 .map(|guard| guard.clone())
457 .unwrap_or_default();
458 missing_cached = urls.iter().any(|url| !cached.contains(url));
459 }
460
461 missing_cached
462 }
463
464 #[doc(hidden)]
470 pub fn __test_with_prepopulated_crls(
471 roots: Arc<RootCertStore>,
472 config: MtlsConfig,
473 prefilled_crls: Vec<CertificateRevocationListDer<'static>>,
474 ) -> Result<Arc<Self>, McpxError> {
475 let (discover_tx, discover_rx) = mpsc::unbounded_channel();
476 drop(discover_rx);
477
478 let mut initial_cache = HashMap::new();
479 for (index, der) in prefilled_crls.into_iter().enumerate() {
480 let source_url = format!("memory://crl/{index}");
481 let (this_update, next_update) = parse_crl_metadata(der.as_ref())?;
482 initial_cache.insert(
483 source_url.clone(),
484 CachedCrl {
485 der,
486 this_update,
487 next_update,
488 fetched_at: SystemTime::now(),
489 source_url,
490 },
491 );
492 }
493
494 Self::new(roots, config, discover_tx, initial_cache)
495 }
496
497 #[doc(hidden)]
509 pub fn __test_with_kept_receiver(
510 roots: Arc<RootCertStore>,
511 config: MtlsConfig,
512 prefilled_crls: Vec<CertificateRevocationListDer<'static>>,
513 ) -> Result<(Arc<Self>, mpsc::UnboundedReceiver<String>), McpxError> {
514 let (discover_tx, discover_rx) = mpsc::unbounded_channel();
515
516 let mut initial_cache = HashMap::new();
517 for (index, der) in prefilled_crls.into_iter().enumerate() {
518 let source_url = format!("memory://crl/{index}");
519 let (this_update, next_update) = parse_crl_metadata(der.as_ref())?;
520 initial_cache.insert(
521 source_url.clone(),
522 CachedCrl {
523 der,
524 this_update,
525 next_update,
526 fetched_at: SystemTime::now(),
527 source_url,
528 },
529 );
530 }
531
532 let crl_set = Self::new(roots, config, discover_tx, initial_cache)?;
533 Ok((crl_set, discover_rx))
534 }
535
536 #[doc(hidden)]
541 pub fn __test_check_discovery_rate(&self, urls: &[String]) -> (usize, usize) {
542 let mut accepted = 0usize;
543 let mut dropped = 0usize;
544 for url in urls {
545 if self.discovery_limiter.check().is_ok() {
546 let _ = self.discover_tx.send(url.clone());
547 accepted += 1;
548 } else {
549 dropped += 1;
550 }
551 }
552 (accepted, dropped)
553 }
554
555 #[doc(hidden)]
559 pub fn __test_note_discovered_urls(&self, urls: &[String]) -> bool {
560 let missing_cached = self.note_discovered_urls(urls);
561 if self.discover_tx.is_closed() {
562 match self.seen_urls.lock() {
563 Ok(mut guard) => {
564 for url in urls {
565 if guard.contains(url) {
566 continue;
567 }
568 if guard.len() >= self.config.crl_max_seen_urls {
569 self.warn_cap_exceeded_throttled("seen_urls");
570 break;
571 }
572 guard.insert(url.clone());
573 }
574 }
575 Err(poisoned) => {
576 let mut guard = poisoned.into_inner();
577 for url in urls {
578 if guard.contains(url) {
579 continue;
580 }
581 if guard.len() >= self.config.crl_max_seen_urls {
582 self.warn_cap_exceeded_throttled("seen_urls");
583 break;
584 }
585 guard.insert(url.clone());
586 }
587 }
588 }
589 }
590 missing_cached
591 }
592
593 #[doc(hidden)]
598 pub fn __test_is_seen(&self, url: &str) -> bool {
599 match self.seen_urls.lock() {
600 Ok(seen) => seen.contains(url),
601 Err(_) => false,
602 }
603 }
604
605 #[cfg(any(test, feature = "test-helpers"))]
608 #[doc(hidden)]
609 pub fn __test_host_semaphore_count(&self) -> usize {
610 self.host_semaphores
611 .try_lock()
612 .map_or(0, |guard| guard.len())
613 }
614
615 #[cfg(any(test, feature = "test-helpers"))]
617 #[doc(hidden)]
618 pub fn __test_cache_len(&self) -> usize {
619 self.cache.try_read().map_or(0, |guard| guard.len())
620 }
621
622 #[cfg(any(test, feature = "test-helpers"))]
624 #[doc(hidden)]
625 pub fn __test_cache_contains(&self, url: &str) -> bool {
626 self.cache
627 .try_read()
628 .is_ok_and(|guard| guard.contains_key(url))
629 }
630
631 #[cfg(any(test, feature = "test-helpers"))]
638 #[doc(hidden)]
639 pub async fn __test_trigger_fetch(&self, url: &str) -> Result<(), McpxError> {
640 if let Err(error) = gated_fetch(
641 &self.client,
642 &self.global_fetch_sem,
643 &self.host_semaphores,
644 url,
645 self.config.crl_allow_http,
646 self.max_response_bytes,
647 self.config.crl_max_host_semaphores,
648 )
649 .await
650 {
651 if error
652 .to_string()
653 .contains("crl_host_semaphore_cap_exceeded")
654 {
655 Err(error)
656 } else {
657 Ok(())
658 }
659 } else {
660 Ok(())
661 }
662 }
663
664 #[cfg(any(test, feature = "test-helpers"))]
676 #[doc(hidden)]
677 pub async fn __test_insert_cache(&self, url: &str, cached: CachedCrl) {
678 let _ = self.insert_cache_entry(url.to_owned(), cached).await;
679 }
680
681 #[cfg(any(test, feature = "test-helpers"))]
686 #[doc(hidden)]
687 pub async fn __test_trigger_refresh_url(&self, url: &str) -> Result<(), McpxError> {
688 self.refresh_urls(vec![url.to_owned()]).await
689 }
690
691 async fn fetch_url_results(
692 &self,
693 urls: Vec<String>,
694 ) -> Vec<(String, Result<CachedCrl, McpxError>)> {
695 let mut tasks = JoinSet::new();
696 for url in urls {
697 let client = self.client.clone();
698 let global_sem = Arc::clone(&self.global_fetch_sem);
699 let host_map = Arc::clone(&self.host_semaphores);
700 let allow_http = self.config.crl_allow_http;
701 let max_bytes = self.max_response_bytes;
702 let max_host_semaphores = self.config.crl_max_host_semaphores;
703 tasks.spawn(async move {
704 let result = gated_fetch(
705 &client,
706 &global_sem,
707 &host_map,
708 &url,
709 allow_http,
710 max_bytes,
711 max_host_semaphores,
712 )
713 .await;
714 (url, result)
715 });
716 }
717
718 let mut results = Vec::new();
719 while let Some(joined) = tasks.join_next().await {
720 match joined {
721 Ok(result) => results.push(result),
722 Err(error) => {
723 tracing::warn!(error = %error, "CRL refresh task join failed");
724 }
725 }
726 }
727
728 results
729 }
730
731 fn swap_verifier_from_cache(
732 &self,
733 cache: &impl std::ops::Deref<Target = HashMap<String, CachedCrl>>,
734 ) -> Result<(), McpxError> {
735 let verifier = rebuild_verifier(&self.roots, &self.config, cache)?;
736 self.inner_verifier
737 .store(Arc::new(VerifierHandle(verifier)));
738 Ok(())
739 }
740}
741
742impl CachedCrl {
743 #[cfg(any(test, feature = "test-helpers"))]
747 #[doc(hidden)]
748 #[must_use]
749 pub fn __test_synthetic(now: SystemTime) -> Self {
750 Self {
751 der: CertificateRevocationListDer::from(vec![0x30, 0x00]),
752 this_update: now,
753 next_update: now.checked_add(Duration::from_hours(24)),
754 fetched_at: now,
755 source_url: "test://synthetic".to_owned(),
756 }
757 }
758
759 #[cfg(any(test, feature = "test-helpers"))]
763 #[doc(hidden)]
764 #[must_use]
765 pub fn __test_stale(reference_past: SystemTime) -> Self {
766 Self {
767 der: CertificateRevocationListDer::from(vec![0x30, 0x00]),
768 this_update: reference_past,
769 next_update: Some(reference_past),
770 fetched_at: reference_past,
771 source_url: "test://stale".to_owned(),
772 }
773 }
774}
775
776pub struct DynamicClientCertVerifier {
779 inner: Arc<CrlSet>,
780 dn_subjects: Vec<DistinguishedName>,
781}
782
783impl DynamicClientCertVerifier {
784 #[must_use]
786 pub fn new(inner: Arc<CrlSet>) -> Self {
787 Self {
788 dn_subjects: inner.roots.subjects(),
789 inner,
790 }
791 }
792}
793
794impl std::fmt::Debug for DynamicClientCertVerifier {
795 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
796 f.debug_struct("DynamicClientCertVerifier")
797 .field("dn_subjects_len", &self.dn_subjects.len())
798 .finish_non_exhaustive()
799 }
800}
801
802impl ClientCertVerifier for DynamicClientCertVerifier {
803 fn offer_client_auth(&self) -> bool {
804 let verifier = self.inner.inner_verifier.load();
805 verifier.0.offer_client_auth()
806 }
807
808 fn client_auth_mandatory(&self) -> bool {
809 let verifier = self.inner.inner_verifier.load();
810 verifier.0.client_auth_mandatory()
811 }
812
813 fn root_hint_subjects(&self) -> &[DistinguishedName] {
814 &self.dn_subjects
815 }
816
817 fn verify_client_cert(
818 &self,
819 end_entity: &CertificateDer<'_>,
820 intermediates: &[CertificateDer<'_>],
821 now: UnixTime,
822 ) -> Result<ClientCertVerified, TlsError> {
823 let mut discovered =
835 extract_cdp_urls(end_entity.as_ref(), self.inner.config.crl_allow_http);
836 for intermediate in intermediates {
837 discovered.extend(extract_cdp_urls(
838 intermediate.as_ref(),
839 self.inner.config.crl_allow_http,
840 ));
841 }
842 discovered.sort();
843 discovered.dedup();
844
845 if self.inner.note_discovered_urls(&discovered) {
846 return Err(TlsError::General(
847 "client certificate revocation status unavailable".to_owned(),
848 ));
849 }
850
851 let verifier = self.inner.inner_verifier.load();
852 verifier
853 .0
854 .verify_client_cert(end_entity, intermediates, now)
855 }
856
857 fn verify_tls12_signature(
858 &self,
859 message: &[u8],
860 cert: &CertificateDer<'_>,
861 dss: &DigitallySignedStruct,
862 ) -> Result<HandshakeSignatureValid, TlsError> {
863 let verifier = self.inner.inner_verifier.load();
864 verifier.0.verify_tls12_signature(message, cert, dss)
865 }
866
867 fn verify_tls13_signature(
868 &self,
869 message: &[u8],
870 cert: &CertificateDer<'_>,
871 dss: &DigitallySignedStruct,
872 ) -> Result<HandshakeSignatureValid, TlsError> {
873 let verifier = self.inner.inner_verifier.load();
874 verifier.0.verify_tls13_signature(message, cert, dss)
875 }
876
877 fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
878 let verifier = self.inner.inner_verifier.load();
879 verifier.0.supported_verify_schemes()
880 }
881
882 fn requires_raw_public_keys(&self) -> bool {
883 let verifier = self.inner.inner_verifier.load();
884 verifier.0.requires_raw_public_keys()
885 }
886}
887
888#[must_use]
897pub fn extract_cdp_urls(cert_der: &[u8], allow_http: bool) -> Vec<String> {
898 let Ok((_, cert)) = X509Certificate::from_der(cert_der) else {
899 return Vec::new();
900 };
901
902 let mut urls = Vec::new();
903 for ext in cert.extensions() {
904 if let ParsedExtension::CRLDistributionPoints(cdps) = ext.parsed_extension() {
905 for point in cdps.iter() {
906 if let Some(DistributionPointName::FullName(names)) = &point.distribution_point {
907 for name in names {
908 if let GeneralName::URI(uri) = name {
909 let raw = *uri;
910 let Ok(parsed) = Url::parse(raw) else {
911 tracing::debug!(url = ?raw, "CDP URL parse failed; dropped");
915 continue;
916 };
917 if let Err(reason) = check_scheme(&parsed, allow_http) {
918 tracing::debug!(
919 url = %sanitized_url_for_log(&parsed),
920 reason,
921 "CDP URL rejected by scheme guard; dropped"
922 );
923 continue;
924 }
925 urls.push(parsed.into());
926 }
927 }
928 }
929 }
930 }
931 }
932
933 urls
934}
935
936#[allow(
943 clippy::cognitive_complexity,
944 reason = "bootstrap coordinates timeout, parallel fetches, and partial-cache recovery"
945)]
946pub async fn bootstrap_fetch(
947 roots: Arc<RootCertStore>,
948 ca_certs: &[CertificateDer<'static>],
949 config: MtlsConfig,
950) -> Result<(Arc<CrlSet>, mpsc::UnboundedReceiver<String>), McpxError> {
951 let (discover_tx, discover_rx) = mpsc::unbounded_channel();
952
953 let mut urls = ca_certs
954 .iter()
955 .flat_map(|cert| extract_cdp_urls(cert.as_ref(), config.crl_allow_http))
956 .collect::<Vec<_>>();
957 urls.sort();
958 urls.dedup();
959
960 let bootstrap_allowlist = Arc::new(crate::ssrf::CompiledSsrfAllowlist::default());
964 let bootstrap_resolver: Arc<dyn reqwest::dns::Resolve> =
965 Arc::new(crate::ssrf_resolver::SsrfScreeningResolver::new(
966 Arc::clone(&bootstrap_allowlist),
967 #[cfg(any(test, feature = "test-helpers"))]
968 Arc::new(std::sync::atomic::AtomicBool::new(false)),
969 #[cfg(not(any(test, feature = "test-helpers")))]
970 (),
971 ));
972
973 let client = reqwest::Client::builder()
974 .no_proxy()
976 .dns_resolver(Arc::clone(&bootstrap_resolver))
977 .timeout(config.crl_fetch_timeout)
978 .connect_timeout(CRL_CONNECT_TIMEOUT)
979 .tcp_keepalive(None)
980 .redirect(reqwest::redirect::Policy::none())
981 .user_agent(format!("rmcp-server-kit/{}", env!("CARGO_PKG_VERSION")))
982 .build()
983 .map_err(|error| McpxError::Startup(format!("CRL HTTP client init: {error}")))?;
984
985 let bootstrap_concurrency = config.crl_max_concurrent_fetches.max(1);
989 let global_sem = Arc::new(Semaphore::new(bootstrap_concurrency));
990 let host_semaphores = Arc::new(tokio::sync::Mutex::new(HashMap::new()));
991 let allow_http = config.crl_allow_http;
992 let max_bytes = config.crl_max_response_bytes;
993 let max_host_semaphores = config.crl_max_host_semaphores;
994
995 let mut initial_cache = HashMap::new();
996 let mut tasks = JoinSet::new();
997 for url in &urls {
998 let client = client.clone();
999 let url = url.clone();
1000 let global_sem = Arc::clone(&global_sem);
1001 let host_semaphores = Arc::clone(&host_semaphores);
1002 tasks.spawn(async move {
1003 let result = gated_fetch(
1004 &client,
1005 &global_sem,
1006 &host_semaphores,
1007 &url,
1008 allow_http,
1009 max_bytes,
1010 max_host_semaphores,
1011 )
1012 .await;
1013 (url, result)
1014 });
1015 }
1016
1017 let timeout: Sleep = tokio::time::sleep(BOOTSTRAP_TIMEOUT);
1018 tokio::pin!(timeout);
1019
1020 while !tasks.is_empty() {
1021 tokio::select! {
1025 () = &mut timeout => {
1026 tracing::warn!("CRL bootstrap timed out after {:?}", BOOTSTRAP_TIMEOUT);
1027 break;
1028 }
1029 maybe_joined = tasks.join_next() => {
1030 let Some(joined) = maybe_joined else {
1031 break;
1032 };
1033 match joined {
1034 Ok((url, Ok(cached))) => {
1035 initial_cache.insert(url, cached);
1036 }
1037 Ok((url, Err(error))) => {
1038 tracing::warn!(url = %url, error = %error, "CRL bootstrap fetch failed");
1039 }
1040 Err(error) => {
1041 tracing::warn!(error = %error, "CRL bootstrap task join failed");
1042 }
1043 }
1044 }
1045 }
1046 }
1047
1048 let set = CrlSet::new(roots, config, discover_tx, initial_cache)?;
1049 Ok((set, discover_rx))
1050}
1051
1052#[allow(
1054 clippy::cognitive_complexity,
1055 reason = "refresher loop intentionally handles shutdown, timer, and discovery in one select"
1056)]
1057pub async fn run_crl_refresher(
1058 set: Arc<CrlSet>,
1059 mut discover_rx: mpsc::UnboundedReceiver<String>,
1060 shutdown: CancellationToken,
1061) {
1062 let mut refresh_sleep = schedule_next_refresh(&set).await;
1063
1064 loop {
1065 tokio::select! {
1069 () = shutdown.cancelled() => {
1070 break;
1071 }
1072 () = &mut refresh_sleep => {
1073 if let Err(error) = set.refresh_due_urls().await {
1074 tracing::warn!(error = %error, "CRL periodic refresh failed");
1075 }
1076 refresh_sleep = schedule_next_refresh(&set).await;
1077 }
1078 maybe_url = discover_rx.recv() => {
1079 let Some(url) = maybe_url else {
1080 break;
1081 };
1082 if let Err(error) = set.fetch_and_store_url(url.clone()).await {
1083 tracing::warn!(url = %url, error = %error, "CRL discovery fetch failed");
1084 }
1085 refresh_sleep = schedule_next_refresh(&set).await;
1086 }
1087 }
1088 }
1089}
1090
1091pub fn rebuild_verifier<S: std::hash::BuildHasher>(
1097 roots: &Arc<RootCertStore>,
1098 config: &MtlsConfig,
1099 cache: &HashMap<String, CachedCrl, S>,
1100) -> Result<Arc<dyn ClientCertVerifier>, McpxError> {
1101 let mut builder = WebPkiClientVerifier::builder(Arc::clone(roots));
1102
1103 if !cache.is_empty() {
1104 let crls = cache
1105 .values()
1106 .map(|cached| cached.der.clone())
1107 .collect::<Vec<_>>();
1108 builder = builder.with_crls(crls);
1109 }
1110 if config.crl_end_entity_only {
1111 builder = builder.only_check_end_entity_revocation();
1112 }
1113 if !config.crl_deny_on_unavailable {
1114 builder = builder.allow_unknown_revocation_status();
1115 }
1116 if config.crl_enforce_expiration {
1117 builder = builder.enforce_revocation_expiration();
1118 }
1119 if !config.required {
1120 builder = builder.allow_unauthenticated();
1121 }
1122
1123 builder
1124 .build()
1125 .map_err(|error| McpxError::Tls(format!("mTLS verifier error: {error}")))
1126}
1127
1128pub fn parse_crl_metadata(der: &[u8]) -> Result<(SystemTime, Option<SystemTime>), McpxError> {
1134 let (_, crl) = CertificateRevocationList::from_der(der)
1135 .map_err(|error| McpxError::Tls(format!("invalid CRL DER: {error:?}")))?;
1136
1137 Ok((
1138 asn1_time_to_system_time(crl.last_update()),
1139 crl.next_update().map(asn1_time_to_system_time),
1140 ))
1141}
1142
1143async fn schedule_next_refresh(set: &CrlSet) -> Pin<Box<Sleep>> {
1144 let duration = next_refresh_delay(set).await;
1145 boxed_sleep(duration)
1146}
1147
1148fn boxed_sleep(duration: Duration) -> Pin<Box<Sleep>> {
1149 Box::pin(tokio::time::sleep_until(Instant::now() + duration))
1150}
1151
1152async fn next_refresh_delay(set: &CrlSet) -> Duration {
1153 if let Some(interval) = set.config.crl_refresh_interval {
1154 return clamp_refresh(interval);
1155 }
1156
1157 let now = SystemTime::now();
1158 let cache = set.cache.read().await;
1159 let mut next = MAX_AUTO_REFRESH;
1160
1161 for cached in cache.values() {
1162 if let Some(next_update) = cached.next_update {
1163 let duration = next_update.duration_since(now).unwrap_or(Duration::ZERO);
1164 next = next.min(clamp_refresh(duration));
1165 }
1166 }
1167 drop(cache);
1168
1169 next
1170}
1171
1172fn acquire_host_semaphore(
1182 map: &mut HashMap<String, Arc<Semaphore>>,
1183 host_key: &str,
1184 max_host_semaphores: usize,
1185) -> Result<Arc<Semaphore>, McpxError> {
1186 if !map.contains_key(host_key) {
1187 if map.len() >= max_host_semaphores {
1188 map.retain(|_, semaphore| Arc::strong_count(semaphore) > 1);
1190 }
1191 if map.len() >= max_host_semaphores {
1192 return Err(McpxError::Config(
1193 "crl_host_semaphore_cap_exceeded: too many distinct CRL hosts in flight".to_owned(),
1194 ));
1195 }
1196 map.insert(host_key.to_owned(), Arc::new(Semaphore::new(1)));
1197 }
1198 match map.get(host_key) {
1199 Some(semaphore) => Ok(Arc::clone(semaphore)),
1200 None => Err(McpxError::Tls(
1201 "CRL host semaphore missing after insertion".to_owned(),
1202 )),
1203 }
1204}
1205
1206async fn gated_fetch(
1214 client: &reqwest::Client,
1215 global_sem: &Arc<Semaphore>,
1216 host_semaphores: &Arc<tokio::sync::Mutex<HashMap<String, Arc<Semaphore>>>>,
1217 url: &str,
1218 allow_http: bool,
1219 max_bytes: u64,
1220 max_host_semaphores: usize,
1221) -> Result<CachedCrl, McpxError> {
1222 let host_key = Url::parse(url)
1223 .ok()
1224 .and_then(|u| u.host_str().map(str::to_owned))
1225 .unwrap_or_else(|| url.to_owned());
1226
1227 let host_sem = {
1228 let mut map = host_semaphores.lock().await;
1229 acquire_host_semaphore(&mut map, &host_key, max_host_semaphores)?
1230 };
1231
1232 let _global_permit = Arc::clone(global_sem)
1233 .acquire_owned()
1234 .await
1235 .map_err(|error| McpxError::Tls(format!("CRL global semaphore closed: {error}")))?;
1236 let _host_permit = host_sem
1237 .acquire_owned()
1238 .await
1239 .map_err(|error| McpxError::Tls(format!("CRL host semaphore closed: {error}")))?;
1240
1241 fetch_crl(client, url, allow_http, max_bytes).await
1242}
1243
1244async fn fetch_crl(
1245 client: &reqwest::Client,
1246 url: &str,
1247 allow_http: bool,
1248 max_bytes: u64,
1249) -> Result<CachedCrl, McpxError> {
1250 let parsed =
1251 Url::parse(url).map_err(|error| McpxError::Tls(format!("CRL URL parse {url}: {error}")))?;
1252
1253 if let Err(reason) = check_scheme(&parsed, allow_http) {
1254 let sanitized = sanitized_url_for_log(&parsed);
1257 tracing::warn!(url = %sanitized, reason, "CRL fetch denied: scheme");
1258 return Err(McpxError::Tls(format!(
1259 "CRL scheme rejected ({reason}): {sanitized}"
1260 )));
1261 }
1262
1263 let host = parsed
1264 .host_str()
1265 .ok_or_else(|| McpxError::Tls(format!("CRL URL has no host: {url}")))?;
1266 let port = parsed
1267 .port_or_known_default()
1268 .ok_or_else(|| McpxError::Tls(format!("CRL URL has no known port: {url}")))?;
1269
1270 let addrs = lookup_host((host, port))
1271 .await
1272 .map_err(|error| McpxError::Tls(format!("CRL DNS resolution {url}: {error}")))?;
1273
1274 let mut any_addr = false;
1275 for addr in addrs {
1276 any_addr = true;
1277 if let Some(reason) = ip_block_reason(addr.ip()) {
1278 tracing::warn!(
1279 url = %url,
1280 resolved_ip = %addr.ip(),
1281 reason,
1282 "CRL fetch denied: blocked IP"
1283 );
1284 return Err(McpxError::Tls(format!(
1285 "CRL host resolved to blocked IP ({reason}): {url}"
1286 )));
1287 }
1288 }
1289 if !any_addr {
1290 return Err(McpxError::Tls(format!(
1291 "CRL DNS resolution returned no addresses: {url}"
1292 )));
1293 }
1294
1295 let mut response = client
1296 .get(url)
1297 .send()
1298 .await
1299 .map_err(|error| McpxError::Tls(format!("CRL fetch {url}: {error}")))?
1300 .error_for_status()
1301 .map_err(|error| McpxError::Tls(format!("CRL fetch {url}: {error}")))?;
1302
1303 let initial_capacity = usize::try_from(max_bytes.min(64 * 1024)).unwrap_or(64 * 1024);
1306 let mut body: Vec<u8> = Vec::with_capacity(initial_capacity);
1307 while let Some(chunk) = response
1308 .chunk()
1309 .await
1310 .map_err(|error| McpxError::Tls(format!("CRL read {url}: {error}")))?
1311 {
1312 let chunk_len = u64::try_from(chunk.len()).unwrap_or(u64::MAX);
1313 let body_len = u64::try_from(body.len()).unwrap_or(u64::MAX);
1314 if body_len.saturating_add(chunk_len) > max_bytes {
1315 return Err(McpxError::Tls(format!(
1316 "CRL body exceeded cap of {max_bytes} bytes: {url}"
1317 )));
1318 }
1319 body.extend_from_slice(&chunk);
1320 }
1321
1322 let der = CertificateRevocationListDer::from(body);
1323 let (this_update, next_update) = parse_crl_metadata(der.as_ref())?;
1324
1325 Ok(CachedCrl {
1326 der,
1327 this_update,
1328 next_update,
1329 fetched_at: SystemTime::now(),
1330 source_url: url.to_owned(),
1331 })
1332}
1333
1334fn should_refresh_cached(
1335 cached: &CachedCrl,
1336 now: SystemTime,
1337 fixed_interval: Option<Duration>,
1338) -> bool {
1339 if let Some(interval) = fixed_interval {
1340 return cached
1341 .fetched_at
1342 .checked_add(clamp_refresh(interval))
1343 .is_none_or(|deadline| now >= deadline);
1344 }
1345
1346 cached
1347 .next_update
1348 .is_none_or(|next_update| now >= next_update)
1349}
1350
1351fn clamp_refresh(duration: Duration) -> Duration {
1352 duration.clamp(MIN_AUTO_REFRESH, MAX_AUTO_REFRESH)
1353}
1354
1355const MAX_ASN1_TIMESTAMP_SECS: u64 = 253_402_300_799;
1359
1360fn asn1_time_to_system_time(time: x509_parser::time::ASN1Time) -> SystemTime {
1369 let timestamp = time.timestamp();
1370 if timestamp >= 0 {
1371 let seconds = u64::try_from(timestamp)
1372 .unwrap_or(0)
1373 .min(MAX_ASN1_TIMESTAMP_SECS);
1374 UNIX_EPOCH
1375 .checked_add(Duration::from_secs(seconds))
1376 .unwrap_or(UNIX_EPOCH)
1377 } else {
1378 UNIX_EPOCH
1379 .checked_sub(Duration::from_secs(timestamp.unsigned_abs()))
1380 .unwrap_or(UNIX_EPOCH)
1381 }
1382}
1383
1384#[cfg(test)]
1385mod tests {
1386 use super::*;
1387
1388 fn asn1(timestamp: i64) -> x509_parser::time::ASN1Time {
1389 x509_parser::time::ASN1Time::from_timestamp(timestamp).expect("valid ASN.1 timestamp")
1390 }
1391
1392 #[tokio::test]
1395 async fn fetch_crl_rejects_userinfo_without_echoing_credentials() {
1396 let _ = rustls::crypto::ring::default_provider().install_default();
1400 let client = reqwest::Client::new();
1401 let err = fetch_crl(&client, "https://u:p@crl.example/ca.crl", false, 1024)
1402 .await
1403 .expect_err("userinfo-bearing CRL URL must be rejected");
1404 let rendered = err.to_string();
1405 assert!(
1406 rendered.contains("userinfo_forbidden"),
1407 "error must carry the rejection reason: {rendered}"
1408 );
1409 assert!(
1410 !rendered.contains("u:p"),
1411 "error must not echo the rejected credentials: {rendered}"
1412 );
1413 }
1414
1415 #[test]
1418 fn sanitizer_used_by_rejection_sites_strips_credentials() {
1419 let parsed = Url::parse("https://u:p@crl.example/ca.crl").expect("parse");
1420 let sanitized = sanitized_url_for_log(&parsed);
1421 assert_eq!(sanitized, "https://crl.example");
1422 assert!(!sanitized.contains("u:p"));
1423 }
1424
1425 #[test]
1426 fn asn1_time_clamps_unrepresentable_timestamps() {
1427 let year_1500 = asn1_time_to_system_time(asn1(-14_831_769_600));
1432 assert!(year_1500 <= UNIX_EPOCH);
1433 #[cfg(windows)]
1434 assert_eq!(year_1500, UNIX_EPOCH);
1435
1436 let year_1601 = asn1_time_to_system_time(asn1(-11_644_473_600));
1439 assert!(year_1601 <= UNIX_EPOCH);
1440
1441 assert!(asn1_time_to_system_time(asn1(-2)) <= UNIX_EPOCH);
1443
1444 assert_eq!(
1446 asn1_time_to_system_time(asn1(1_700_000_000)),
1447 UNIX_EPOCH + Duration::from_secs(1_700_000_000)
1448 );
1449
1450 let max = i64::try_from(MAX_ASN1_TIMESTAMP_SECS).expect("fits in i64");
1452 assert_eq!(
1453 asn1_time_to_system_time(asn1(max)),
1454 UNIX_EPOCH + Duration::from_secs(MAX_ASN1_TIMESTAMP_SECS)
1455 );
1456 }
1457
1458 #[test]
1459 fn host_semaphore_evicts_idle_at_cap() {
1460 let mut map = HashMap::new();
1461 for i in 0..4 {
1462 drop(
1464 acquire_host_semaphore(&mut map, &format!("idle-{i}.example"), 4)
1465 .expect("under cap"),
1466 );
1467 }
1468 assert_eq!(map.len(), 4);
1469
1470 let sem = acquire_host_semaphore(&mut map, "new-host.example", 4)
1473 .expect("idle eviction frees space for a new host");
1474 assert!(map.contains_key("new-host.example"));
1475 drop(sem);
1476 }
1477
1478 #[test]
1479 fn host_semaphore_keeps_inflight_at_cap() {
1480 let mut map = HashMap::new();
1481 let inflight = acquire_host_semaphore(&mut map, "busy.example", 3).expect("under cap");
1483 for i in 0..2 {
1484 drop(
1485 acquire_host_semaphore(&mut map, &format!("idle-{i}.example"), 3)
1486 .expect("under cap"),
1487 );
1488 }
1489 assert_eq!(map.len(), 3);
1490
1491 drop(
1492 acquire_host_semaphore(&mut map, "new-host.example", 3)
1493 .expect("idle entries evicted while in-flight survives"),
1494 );
1495 assert!(
1496 map.contains_key("busy.example"),
1497 "in-flight host must survive eviction"
1498 );
1499 assert!(map.contains_key("new-host.example"));
1500 drop(inflight);
1501 }
1502
1503 #[test]
1504 fn host_semaphore_cap_error_when_all_inflight() {
1505 let mut map = HashMap::new();
1506 let held: Vec<_> = (0..2)
1507 .map(|i| {
1508 acquire_host_semaphore(&mut map, &format!("busy-{i}.example"), 2)
1509 .expect("under cap")
1510 })
1511 .collect();
1512
1513 let result = acquire_host_semaphore(&mut map, "new-host.example", 2);
1514 assert!(
1515 result.is_err(),
1516 "cap must still reject when every entry has an in-flight fetch"
1517 );
1518 drop(held);
1519 }
1520}