1use std::{
20 collections::{HashMap, HashSet},
21 num::NonZeroU32,
22 pin::Pin,
23 sync::{Arc, Mutex},
24 time::{Duration, SystemTime, UNIX_EPOCH},
25};
26
27use arc_swap::ArcSwap;
28use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
29use rustls::{
30 DigitallySignedStruct, DistinguishedName, Error as TlsError, RootCertStore, SignatureScheme,
31 client::danger::HandshakeSignatureValid,
32 pki_types::{CertificateDer, CertificateRevocationListDer, UnixTime},
33 server::{
34 WebPkiClientVerifier,
35 danger::{ClientCertVerified, ClientCertVerifier},
36 },
37};
38use tokio::{
39 net::lookup_host,
40 sync::{RwLock, Semaphore, mpsc},
41 task::JoinSet,
42 time::{Instant, Sleep},
43};
44use tokio_util::sync::CancellationToken;
45use url::Url;
46use x509_parser::{
47 extensions::{DistributionPointName, GeneralName, ParsedExtension},
48 prelude::{FromDer, X509Certificate},
49 revocation_list::CertificateRevocationList,
50};
51
52use crate::{
53 auth::MtlsConfig,
54 error::McpxError,
55 ssrf::{check_scheme, ip_block_reason},
56};
57
58const BOOTSTRAP_TIMEOUT: Duration = Duration::from_secs(10);
59const MIN_AUTO_REFRESH: Duration = Duration::from_mins(10);
60const MAX_AUTO_REFRESH: Duration = Duration::from_hours(24);
61const CRL_CONNECT_TIMEOUT: Duration = Duration::from_secs(3);
64
65#[derive(Clone, Debug)]
67#[non_exhaustive]
68pub struct CachedCrl {
69 pub der: CertificateRevocationListDer<'static>,
71 pub this_update: SystemTime,
73 pub next_update: Option<SystemTime>,
75 pub fetched_at: SystemTime,
77 pub source_url: String,
79}
80
81pub(crate) struct VerifierHandle(pub Arc<dyn ClientCertVerifier>);
82
83impl std::fmt::Debug for VerifierHandle {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 f.debug_struct("VerifierHandle").finish_non_exhaustive()
86 }
87}
88
89#[allow(
91 missing_debug_implementations,
92 reason = "contains ArcSwap and dyn verifier internals"
93)]
94#[non_exhaustive]
95pub struct CrlSet {
96 inner_verifier: ArcSwap<VerifierHandle>,
97 pub cache: RwLock<HashMap<String, CachedCrl>>,
99 pub roots: Arc<RootCertStore>,
101 pub config: MtlsConfig,
103 pub discover_tx: mpsc::UnboundedSender<String>,
105 client: reqwest::Client,
106 seen_urls: Mutex<HashSet<String>>,
107 cached_urls: Mutex<HashSet<String>>,
108 global_fetch_sem: Arc<Semaphore>,
110 host_semaphores: Arc<tokio::sync::Mutex<HashMap<String, Arc<Semaphore>>>>,
112 discovery_limiter: Arc<DefaultDirectRateLimiter>,
120 max_response_bytes: u64,
123 last_cap_warn: Mutex<HashMap<&'static str, Instant>>,
124}
125
126impl CrlSet {
127 fn new(
128 roots: Arc<RootCertStore>,
129 config: MtlsConfig,
130 discover_tx: mpsc::UnboundedSender<String>,
131 initial_cache: HashMap<String, CachedCrl>,
132 ) -> Result<Arc<Self>, McpxError> {
133 let client = reqwest::Client::builder()
134 .timeout(config.crl_fetch_timeout)
135 .connect_timeout(CRL_CONNECT_TIMEOUT)
136 .tcp_keepalive(None)
137 .redirect(reqwest::redirect::Policy::none())
138 .user_agent(format!("rmcp-server-kit/{}", env!("CARGO_PKG_VERSION")))
139 .build()
140 .map_err(|error| McpxError::Startup(format!("CRL HTTP client init: {error}")))?;
141
142 let initial_verifier = rebuild_verifier(&roots, &config, &initial_cache)?;
143 let seen_urls = initial_cache.keys().cloned().collect::<HashSet<_>>();
144 let cached_urls = seen_urls.clone();
145
146 let concurrency = config.crl_max_concurrent_fetches.max(1);
147 let global_fetch_sem = Arc::new(Semaphore::new(concurrency));
148 let host_semaphores = Arc::new(tokio::sync::Mutex::new(HashMap::new()));
149
150 let rate =
151 NonZeroU32::new(config.crl_discovery_rate_per_min.max(1)).unwrap_or(NonZeroU32::MIN);
152 let discovery_limiter = Arc::new(RateLimiter::direct(Quota::per_minute(rate)));
153
154 let max_response_bytes = config.crl_max_response_bytes;
155
156 Ok(Arc::new(Self {
157 inner_verifier: ArcSwap::from_pointee(VerifierHandle(initial_verifier)),
158 cache: RwLock::new(initial_cache),
159 roots,
160 config,
161 discover_tx,
162 client,
163 seen_urls: Mutex::new(seen_urls),
164 cached_urls: Mutex::new(cached_urls),
165 global_fetch_sem,
166 host_semaphores,
167 discovery_limiter,
168 max_response_bytes,
169 last_cap_warn: Mutex::new(HashMap::new()),
170 }))
171 }
172
173 fn warn_cap_exceeded_throttled(&self, which: &'static str) {
174 let now = Instant::now();
175 let cooldown = Duration::from_mins(1);
176 let should_warn = match self.last_cap_warn.lock() {
177 Ok(mut guard) => {
178 let should_emit = guard
179 .get(which)
180 .is_none_or(|last| now.saturating_duration_since(*last) >= cooldown);
181 if should_emit {
182 guard.insert(which, now);
183 }
184 should_emit
185 }
186 Err(poisoned) => {
187 let mut guard = poisoned.into_inner();
188 let should_emit = guard
189 .get(which)
190 .is_none_or(|last| now.saturating_duration_since(*last) >= cooldown);
191 if should_emit {
192 guard.insert(which, now);
193 }
194 should_emit
195 }
196 };
197
198 if should_warn {
199 tracing::warn!(which = which, "CRL map cap exceeded; dropping newest entry");
200 }
201 }
202
203 async fn insert_cache_entry(&self, url: String, cached: CachedCrl) -> bool {
204 let inserted = {
205 let mut guard = self.cache.write().await;
206 if guard.len() >= self.config.crl_max_cache_entries && !guard.contains_key(&url) {
207 false
208 } else {
209 guard.insert(url.clone(), cached);
210 true
211 }
212 };
213
214 if inserted {
215 match self.cached_urls.lock() {
216 Ok(mut cached_urls) => {
217 cached_urls.insert(url);
218 }
219 Err(poisoned) => {
220 poisoned.into_inner().insert(url);
221 }
222 }
223 } else {
224 self.warn_cap_exceeded_throttled("cache");
225 }
226
227 inserted
228 }
229
230 pub async fn force_refresh(&self) -> Result<(), McpxError> {
236 let urls = {
237 let cache = self.cache.read().await;
238 cache.keys().cloned().collect::<Vec<_>>()
239 };
240 self.refresh_urls(urls).await
241 }
242
243 async fn refresh_due_urls(&self) -> Result<(), McpxError> {
244 let now = SystemTime::now();
245 let urls = {
246 let cache = self.cache.read().await;
247 cache
248 .iter()
249 .filter(|(_, cached)| {
250 should_refresh_cached(cached, now, self.config.crl_refresh_interval)
251 })
252 .map(|(url, _)| url.clone())
253 .collect::<Vec<_>>()
254 };
255
256 if urls.is_empty() {
257 return Ok(());
258 }
259
260 self.refresh_urls(urls).await
261 }
262
263 async fn refresh_urls(&self, urls: Vec<String>) -> Result<(), McpxError> {
264 let results = self.fetch_url_results(urls).await;
265 let now = SystemTime::now();
266 let mut cache = self.cache.write().await;
267 let mut changed = false;
268
269 for (url, result) in results {
270 match result {
271 Ok(cached) => {
272 if cache.len() >= self.config.crl_max_cache_entries && !cache.contains_key(&url)
273 {
274 drop(cache);
275 self.warn_cap_exceeded_throttled("cache");
276 cache = self.cache.write().await;
277 continue;
278 }
279 cache.insert(url.clone(), cached);
280 changed = true;
281 match self.cached_urls.lock() {
282 Ok(mut cached_urls) => {
283 cached_urls.insert(url);
284 }
285 Err(poisoned) => {
286 poisoned.into_inner().insert(url);
287 }
288 }
289 }
290 Err(error) => {
291 let remove_entry = cache.get(&url).is_some_and(|existing| {
292 existing
293 .next_update
294 .and_then(|next| next.checked_add(self.config.crl_stale_grace))
295 .is_some_and(|deadline| now > deadline)
296 });
297 tracing::warn!(url = %url, error = %error, "CRL refresh failed");
298 if remove_entry {
299 cache.remove(&url);
300 changed = true;
301 match self.cached_urls.lock() {
302 Ok(mut cached_urls) => {
303 cached_urls.remove(&url);
304 }
305 Err(poisoned) => {
306 poisoned.into_inner().remove(&url);
307 }
308 }
309 match self.seen_urls.lock() {
310 Ok(mut seen_urls) => {
311 seen_urls.remove(&url);
312 }
313 Err(poisoned) => {
314 poisoned.into_inner().remove(&url);
315 }
316 }
317 }
318 }
319 }
320 }
321
322 if changed {
323 self.swap_verifier_from_cache(&cache)?;
324 }
325
326 Ok(())
327 }
328
329 async fn fetch_and_store_url(&self, url: String) -> Result<(), McpxError> {
330 let cached = gated_fetch(
331 &self.client,
332 &self.global_fetch_sem,
333 &self.host_semaphores,
334 &url,
335 self.config.crl_allow_http,
336 self.max_response_bytes,
337 self.config.crl_max_host_semaphores,
338 )
339 .await?;
340 if !self.insert_cache_entry(url, cached).await {
341 return Ok(());
342 }
343 let cache = self.cache.read().await;
344 self.swap_verifier_from_cache(&cache)?;
345 Ok(())
346 }
347
348 fn note_discovered_urls(&self, urls: &[String]) -> bool {
349 let mut missing_cached = false;
354
355 let candidates: Vec<String> = match self.seen_urls.lock() {
366 Ok(seen) => urls
367 .iter()
368 .filter(|url| !seen.contains(*url))
369 .cloned()
370 .collect(),
371 Err(_) => Vec::new(),
372 };
373
374 for url in candidates {
381 if self.discovery_limiter.check().is_err() {
382 tracing::warn!(
383 url = %url,
384 "discovery_rate_limited: dropped CDP URL beyond per-minute cap (will be retried on next handshake observing this URL)"
385 );
386 continue;
387 }
388 if self.discover_tx.send(url.clone()).is_err() {
389 tracing::debug!(
392 url = %url,
393 "discover channel closed; dropping CDP URL without marking seen"
394 );
395 continue;
396 }
397 let mut guard = self
399 .seen_urls
400 .lock()
401 .unwrap_or_else(std::sync::PoisonError::into_inner);
402 if guard.len() >= self.config.crl_max_seen_urls {
403 self.warn_cap_exceeded_throttled("seen_urls");
404 break;
405 }
406 guard.insert(url);
407 }
408
409 if self.config.crl_deny_on_unavailable {
410 let cached = self
411 .cached_urls
412 .lock()
413 .ok()
414 .map(|guard| guard.clone())
415 .unwrap_or_default();
416 missing_cached = urls.iter().any(|url| !cached.contains(url));
417 }
418
419 missing_cached
420 }
421
422 #[doc(hidden)]
428 pub fn __test_with_prepopulated_crls(
429 roots: Arc<RootCertStore>,
430 config: MtlsConfig,
431 prefilled_crls: Vec<CertificateRevocationListDer<'static>>,
432 ) -> Result<Arc<Self>, McpxError> {
433 let (discover_tx, discover_rx) = mpsc::unbounded_channel();
434 drop(discover_rx);
435
436 let mut initial_cache = HashMap::new();
437 for (index, der) in prefilled_crls.into_iter().enumerate() {
438 let source_url = format!("memory://crl/{index}");
439 let (this_update, next_update) = parse_crl_metadata(der.as_ref())?;
440 initial_cache.insert(
441 source_url.clone(),
442 CachedCrl {
443 der,
444 this_update,
445 next_update,
446 fetched_at: SystemTime::now(),
447 source_url,
448 },
449 );
450 }
451
452 Self::new(roots, config, discover_tx, initial_cache)
453 }
454
455 #[doc(hidden)]
467 pub fn __test_with_kept_receiver(
468 roots: Arc<RootCertStore>,
469 config: MtlsConfig,
470 prefilled_crls: Vec<CertificateRevocationListDer<'static>>,
471 ) -> Result<(Arc<Self>, mpsc::UnboundedReceiver<String>), McpxError> {
472 let (discover_tx, discover_rx) = mpsc::unbounded_channel();
473
474 let mut initial_cache = HashMap::new();
475 for (index, der) in prefilled_crls.into_iter().enumerate() {
476 let source_url = format!("memory://crl/{index}");
477 let (this_update, next_update) = parse_crl_metadata(der.as_ref())?;
478 initial_cache.insert(
479 source_url.clone(),
480 CachedCrl {
481 der,
482 this_update,
483 next_update,
484 fetched_at: SystemTime::now(),
485 source_url,
486 },
487 );
488 }
489
490 let crl_set = Self::new(roots, config, discover_tx, initial_cache)?;
491 Ok((crl_set, discover_rx))
492 }
493
494 #[doc(hidden)]
499 pub fn __test_check_discovery_rate(&self, urls: &[String]) -> (usize, usize) {
500 let mut accepted = 0usize;
501 let mut dropped = 0usize;
502 for url in urls {
503 if self.discovery_limiter.check().is_ok() {
504 let _ = self.discover_tx.send(url.clone());
505 accepted += 1;
506 } else {
507 dropped += 1;
508 }
509 }
510 (accepted, dropped)
511 }
512
513 #[doc(hidden)]
517 pub fn __test_note_discovered_urls(&self, urls: &[String]) -> bool {
518 let missing_cached = self.note_discovered_urls(urls);
519 if self.discover_tx.is_closed() {
520 match self.seen_urls.lock() {
521 Ok(mut guard) => {
522 for url in urls {
523 if guard.contains(url) {
524 continue;
525 }
526 if guard.len() >= self.config.crl_max_seen_urls {
527 self.warn_cap_exceeded_throttled("seen_urls");
528 break;
529 }
530 guard.insert(url.clone());
531 }
532 }
533 Err(poisoned) => {
534 let mut guard = poisoned.into_inner();
535 for url in urls {
536 if guard.contains(url) {
537 continue;
538 }
539 if guard.len() >= self.config.crl_max_seen_urls {
540 self.warn_cap_exceeded_throttled("seen_urls");
541 break;
542 }
543 guard.insert(url.clone());
544 }
545 }
546 }
547 }
548 missing_cached
549 }
550
551 #[doc(hidden)]
556 pub fn __test_is_seen(&self, url: &str) -> bool {
557 match self.seen_urls.lock() {
558 Ok(seen) => seen.contains(url),
559 Err(_) => false,
560 }
561 }
562
563 #[cfg(any(test, feature = "test-helpers"))]
566 #[doc(hidden)]
567 pub fn __test_host_semaphore_count(&self) -> usize {
568 self.host_semaphores
569 .try_lock()
570 .map_or(0, |guard| guard.len())
571 }
572
573 #[cfg(any(test, feature = "test-helpers"))]
575 #[doc(hidden)]
576 pub fn __test_cache_len(&self) -> usize {
577 self.cache.try_read().map_or(0, |guard| guard.len())
578 }
579
580 #[cfg(any(test, feature = "test-helpers"))]
582 #[doc(hidden)]
583 pub fn __test_cache_contains(&self, url: &str) -> bool {
584 self.cache
585 .try_read()
586 .is_ok_and(|guard| guard.contains_key(url))
587 }
588
589 #[cfg(any(test, feature = "test-helpers"))]
596 #[doc(hidden)]
597 pub async fn __test_trigger_fetch(&self, url: &str) -> Result<(), McpxError> {
598 if let Err(error) = gated_fetch(
599 &self.client,
600 &self.global_fetch_sem,
601 &self.host_semaphores,
602 url,
603 self.config.crl_allow_http,
604 self.max_response_bytes,
605 self.config.crl_max_host_semaphores,
606 )
607 .await
608 {
609 if error
610 .to_string()
611 .contains("crl_host_semaphore_cap_exceeded")
612 {
613 Err(error)
614 } else {
615 Ok(())
616 }
617 } else {
618 Ok(())
619 }
620 }
621
622 #[cfg(any(test, feature = "test-helpers"))]
634 #[doc(hidden)]
635 pub async fn __test_insert_cache(&self, url: &str, cached: CachedCrl) {
636 let _ = self.insert_cache_entry(url.to_owned(), cached).await;
637 }
638
639 #[cfg(any(test, feature = "test-helpers"))]
644 #[doc(hidden)]
645 pub async fn __test_trigger_refresh_url(&self, url: &str) -> Result<(), McpxError> {
646 self.refresh_urls(vec![url.to_owned()]).await
647 }
648
649 async fn fetch_url_results(
650 &self,
651 urls: Vec<String>,
652 ) -> Vec<(String, Result<CachedCrl, McpxError>)> {
653 let mut tasks = JoinSet::new();
654 for url in urls {
655 let client = self.client.clone();
656 let global_sem = Arc::clone(&self.global_fetch_sem);
657 let host_map = Arc::clone(&self.host_semaphores);
658 let allow_http = self.config.crl_allow_http;
659 let max_bytes = self.max_response_bytes;
660 let max_host_semaphores = self.config.crl_max_host_semaphores;
661 tasks.spawn(async move {
662 let result = gated_fetch(
663 &client,
664 &global_sem,
665 &host_map,
666 &url,
667 allow_http,
668 max_bytes,
669 max_host_semaphores,
670 )
671 .await;
672 (url, result)
673 });
674 }
675
676 let mut results = Vec::new();
677 while let Some(joined) = tasks.join_next().await {
678 match joined {
679 Ok(result) => results.push(result),
680 Err(error) => {
681 tracing::warn!(error = %error, "CRL refresh task join failed");
682 }
683 }
684 }
685
686 results
687 }
688
689 fn swap_verifier_from_cache(
690 &self,
691 cache: &impl std::ops::Deref<Target = HashMap<String, CachedCrl>>,
692 ) -> Result<(), McpxError> {
693 let verifier = rebuild_verifier(&self.roots, &self.config, cache)?;
694 self.inner_verifier
695 .store(Arc::new(VerifierHandle(verifier)));
696 Ok(())
697 }
698}
699
700impl CachedCrl {
701 #[cfg(any(test, feature = "test-helpers"))]
705 #[doc(hidden)]
706 #[must_use]
707 pub fn __test_synthetic(now: SystemTime) -> Self {
708 Self {
709 der: CertificateRevocationListDer::from(vec![0x30, 0x00]),
710 this_update: now,
711 next_update: now.checked_add(Duration::from_hours(24)),
712 fetched_at: now,
713 source_url: "test://synthetic".to_owned(),
714 }
715 }
716
717 #[cfg(any(test, feature = "test-helpers"))]
721 #[doc(hidden)]
722 #[must_use]
723 pub fn __test_stale(reference_past: SystemTime) -> Self {
724 Self {
725 der: CertificateRevocationListDer::from(vec![0x30, 0x00]),
726 this_update: reference_past,
727 next_update: Some(reference_past),
728 fetched_at: reference_past,
729 source_url: "test://stale".to_owned(),
730 }
731 }
732}
733
734pub struct DynamicClientCertVerifier {
737 inner: Arc<CrlSet>,
738 dn_subjects: Vec<DistinguishedName>,
739}
740
741impl DynamicClientCertVerifier {
742 #[must_use]
744 pub fn new(inner: Arc<CrlSet>) -> Self {
745 Self {
746 dn_subjects: inner.roots.subjects(),
747 inner,
748 }
749 }
750}
751
752impl std::fmt::Debug for DynamicClientCertVerifier {
753 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
754 f.debug_struct("DynamicClientCertVerifier")
755 .field("dn_subjects_len", &self.dn_subjects.len())
756 .finish_non_exhaustive()
757 }
758}
759
760impl ClientCertVerifier for DynamicClientCertVerifier {
761 fn offer_client_auth(&self) -> bool {
762 let verifier = self.inner.inner_verifier.load();
763 verifier.0.offer_client_auth()
764 }
765
766 fn client_auth_mandatory(&self) -> bool {
767 let verifier = self.inner.inner_verifier.load();
768 verifier.0.client_auth_mandatory()
769 }
770
771 fn root_hint_subjects(&self) -> &[DistinguishedName] {
772 &self.dn_subjects
773 }
774
775 fn verify_client_cert(
776 &self,
777 end_entity: &CertificateDer<'_>,
778 intermediates: &[CertificateDer<'_>],
779 now: UnixTime,
780 ) -> Result<ClientCertVerified, TlsError> {
781 let mut discovered =
782 extract_cdp_urls(end_entity.as_ref(), self.inner.config.crl_allow_http);
783 for intermediate in intermediates {
784 discovered.extend(extract_cdp_urls(
785 intermediate.as_ref(),
786 self.inner.config.crl_allow_http,
787 ));
788 }
789 discovered.sort();
790 discovered.dedup();
791
792 if self.inner.note_discovered_urls(&discovered) {
793 return Err(TlsError::General(
794 "client certificate revocation status unavailable".to_owned(),
795 ));
796 }
797
798 let verifier = self.inner.inner_verifier.load();
799 verifier
800 .0
801 .verify_client_cert(end_entity, intermediates, now)
802 }
803
804 fn verify_tls12_signature(
805 &self,
806 message: &[u8],
807 cert: &CertificateDer<'_>,
808 dss: &DigitallySignedStruct,
809 ) -> Result<HandshakeSignatureValid, TlsError> {
810 let verifier = self.inner.inner_verifier.load();
811 verifier.0.verify_tls12_signature(message, cert, dss)
812 }
813
814 fn verify_tls13_signature(
815 &self,
816 message: &[u8],
817 cert: &CertificateDer<'_>,
818 dss: &DigitallySignedStruct,
819 ) -> Result<HandshakeSignatureValid, TlsError> {
820 let verifier = self.inner.inner_verifier.load();
821 verifier.0.verify_tls13_signature(message, cert, dss)
822 }
823
824 fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
825 let verifier = self.inner.inner_verifier.load();
826 verifier.0.supported_verify_schemes()
827 }
828
829 fn requires_raw_public_keys(&self) -> bool {
830 let verifier = self.inner.inner_verifier.load();
831 verifier.0.requires_raw_public_keys()
832 }
833}
834
835#[must_use]
843pub fn extract_cdp_urls(cert_der: &[u8], allow_http: bool) -> Vec<String> {
844 let Ok((_, cert)) = X509Certificate::from_der(cert_der) else {
845 return Vec::new();
846 };
847
848 let mut urls = Vec::new();
849 for ext in cert.extensions() {
850 if let ParsedExtension::CRLDistributionPoints(cdps) = ext.parsed_extension() {
851 for point in cdps.iter() {
852 if let Some(DistributionPointName::FullName(names)) = &point.distribution_point {
853 for name in names {
854 if let GeneralName::URI(uri) = name {
855 let raw = *uri;
856 let Ok(parsed) = Url::parse(raw) else {
857 tracing::debug!(url = %raw, "CDP URL parse failed; dropped");
858 continue;
859 };
860 if let Err(reason) = check_scheme(&parsed, allow_http) {
861 tracing::debug!(
862 url = %raw,
863 reason,
864 "CDP URL rejected by scheme guard; dropped"
865 );
866 continue;
867 }
868 urls.push(parsed.into());
869 }
870 }
871 }
872 }
873 }
874 }
875
876 urls
877}
878
879#[allow(
886 clippy::cognitive_complexity,
887 reason = "bootstrap coordinates timeout, parallel fetches, and partial-cache recovery"
888)]
889pub async fn bootstrap_fetch(
890 roots: Arc<RootCertStore>,
891 ca_certs: &[CertificateDer<'static>],
892 config: MtlsConfig,
893) -> Result<(Arc<CrlSet>, mpsc::UnboundedReceiver<String>), McpxError> {
894 let (discover_tx, discover_rx) = mpsc::unbounded_channel();
895
896 let mut urls = ca_certs
897 .iter()
898 .flat_map(|cert| extract_cdp_urls(cert.as_ref(), config.crl_allow_http))
899 .collect::<Vec<_>>();
900 urls.sort();
901 urls.dedup();
902
903 let client = reqwest::Client::builder()
904 .timeout(config.crl_fetch_timeout)
905 .connect_timeout(CRL_CONNECT_TIMEOUT)
906 .tcp_keepalive(None)
907 .redirect(reqwest::redirect::Policy::none())
908 .user_agent(format!("rmcp-server-kit/{}", env!("CARGO_PKG_VERSION")))
909 .build()
910 .map_err(|error| McpxError::Startup(format!("CRL HTTP client init: {error}")))?;
911
912 let bootstrap_concurrency = config.crl_max_concurrent_fetches.max(1);
916 let global_sem = Arc::new(Semaphore::new(bootstrap_concurrency));
917 let host_semaphores = Arc::new(tokio::sync::Mutex::new(HashMap::new()));
918 let allow_http = config.crl_allow_http;
919 let max_bytes = config.crl_max_response_bytes;
920 let max_host_semaphores = config.crl_max_host_semaphores;
921
922 let mut initial_cache = HashMap::new();
923 let mut tasks = JoinSet::new();
924 for url in &urls {
925 let client = client.clone();
926 let url = url.clone();
927 let global_sem = Arc::clone(&global_sem);
928 let host_semaphores = Arc::clone(&host_semaphores);
929 tasks.spawn(async move {
930 let result = gated_fetch(
931 &client,
932 &global_sem,
933 &host_semaphores,
934 &url,
935 allow_http,
936 max_bytes,
937 max_host_semaphores,
938 )
939 .await;
940 (url, result)
941 });
942 }
943
944 let timeout: Sleep = tokio::time::sleep(BOOTSTRAP_TIMEOUT);
945 tokio::pin!(timeout);
946
947 while !tasks.is_empty() {
948 tokio::select! {
949 () = &mut timeout => {
950 tracing::warn!("CRL bootstrap timed out after {:?}", BOOTSTRAP_TIMEOUT);
951 break;
952 }
953 maybe_joined = tasks.join_next() => {
954 let Some(joined) = maybe_joined else {
955 break;
956 };
957 match joined {
958 Ok((url, Ok(cached))) => {
959 initial_cache.insert(url, cached);
960 }
961 Ok((url, Err(error))) => {
962 tracing::warn!(url = %url, error = %error, "CRL bootstrap fetch failed");
963 }
964 Err(error) => {
965 tracing::warn!(error = %error, "CRL bootstrap task join failed");
966 }
967 }
968 }
969 }
970 }
971
972 let set = CrlSet::new(roots, config, discover_tx, initial_cache)?;
973 Ok((set, discover_rx))
974}
975
976#[allow(
978 clippy::cognitive_complexity,
979 reason = "refresher loop intentionally handles shutdown, timer, and discovery in one select"
980)]
981pub async fn run_crl_refresher(
982 set: Arc<CrlSet>,
983 mut discover_rx: mpsc::UnboundedReceiver<String>,
984 shutdown: CancellationToken,
985) {
986 let mut refresh_sleep = schedule_next_refresh(&set).await;
987
988 loop {
989 tokio::select! {
990 () = shutdown.cancelled() => {
991 break;
992 }
993 () = &mut refresh_sleep => {
994 if let Err(error) = set.refresh_due_urls().await {
995 tracing::warn!(error = %error, "CRL periodic refresh failed");
996 }
997 refresh_sleep = schedule_next_refresh(&set).await;
998 }
999 maybe_url = discover_rx.recv() => {
1000 let Some(url) = maybe_url else {
1001 break;
1002 };
1003 if let Err(error) = set.fetch_and_store_url(url.clone()).await {
1004 tracing::warn!(url = %url, error = %error, "CRL discovery fetch failed");
1005 }
1006 refresh_sleep = schedule_next_refresh(&set).await;
1007 }
1008 }
1009 }
1010}
1011
1012pub fn rebuild_verifier<S: std::hash::BuildHasher>(
1018 roots: &Arc<RootCertStore>,
1019 config: &MtlsConfig,
1020 cache: &HashMap<String, CachedCrl, S>,
1021) -> Result<Arc<dyn ClientCertVerifier>, McpxError> {
1022 let mut builder = WebPkiClientVerifier::builder(Arc::clone(roots));
1023
1024 if !cache.is_empty() {
1025 let crls = cache
1026 .values()
1027 .map(|cached| cached.der.clone())
1028 .collect::<Vec<_>>();
1029 builder = builder.with_crls(crls);
1030 }
1031 if config.crl_end_entity_only {
1032 builder = builder.only_check_end_entity_revocation();
1033 }
1034 if !config.crl_deny_on_unavailable {
1035 builder = builder.allow_unknown_revocation_status();
1036 }
1037 if config.crl_enforce_expiration {
1038 builder = builder.enforce_revocation_expiration();
1039 }
1040 if !config.required {
1041 builder = builder.allow_unauthenticated();
1042 }
1043
1044 builder
1045 .build()
1046 .map_err(|error| McpxError::Tls(format!("mTLS verifier error: {error}")))
1047}
1048
1049pub fn parse_crl_metadata(der: &[u8]) -> Result<(SystemTime, Option<SystemTime>), McpxError> {
1055 let (_, crl) = CertificateRevocationList::from_der(der)
1056 .map_err(|error| McpxError::Tls(format!("invalid CRL DER: {error:?}")))?;
1057
1058 Ok((
1059 asn1_time_to_system_time(crl.last_update()),
1060 crl.next_update().map(asn1_time_to_system_time),
1061 ))
1062}
1063
1064async fn schedule_next_refresh(set: &CrlSet) -> Pin<Box<Sleep>> {
1065 let duration = next_refresh_delay(set).await;
1066 boxed_sleep(duration)
1067}
1068
1069fn boxed_sleep(duration: Duration) -> Pin<Box<Sleep>> {
1070 Box::pin(tokio::time::sleep_until(Instant::now() + duration))
1071}
1072
1073async fn next_refresh_delay(set: &CrlSet) -> Duration {
1074 if let Some(interval) = set.config.crl_refresh_interval {
1075 return clamp_refresh(interval);
1076 }
1077
1078 let now = SystemTime::now();
1079 let cache = set.cache.read().await;
1080 let mut next = MAX_AUTO_REFRESH;
1081
1082 for cached in cache.values() {
1083 if let Some(next_update) = cached.next_update {
1084 let duration = next_update.duration_since(now).unwrap_or(Duration::ZERO);
1085 next = next.min(clamp_refresh(duration));
1086 }
1087 }
1088
1089 next
1090}
1091
1092async fn gated_fetch(
1099 client: &reqwest::Client,
1100 global_sem: &Arc<Semaphore>,
1101 host_semaphores: &Arc<tokio::sync::Mutex<HashMap<String, Arc<Semaphore>>>>,
1102 url: &str,
1103 allow_http: bool,
1104 max_bytes: u64,
1105 max_host_semaphores: usize,
1106) -> Result<CachedCrl, McpxError> {
1107 let host_key = Url::parse(url)
1108 .ok()
1109 .and_then(|u| u.host_str().map(str::to_owned))
1110 .unwrap_or_else(|| url.to_owned());
1111
1112 let host_sem = {
1113 let mut map = host_semaphores.lock().await;
1114 if !map.contains_key(&host_key) {
1115 if map.len() >= max_host_semaphores {
1116 return Err(McpxError::Config(
1117 "crl_host_semaphore_cap_exceeded: too many distinct CRL hosts in flight"
1118 .to_owned(),
1119 ));
1120 }
1121 map.insert(host_key.clone(), Arc::new(Semaphore::new(1)));
1122 }
1123 match map.get(&host_key) {
1124 Some(semaphore) => Arc::clone(semaphore),
1125 None => {
1126 return Err(McpxError::Tls(
1127 "CRL host semaphore missing after insertion".to_owned(),
1128 ));
1129 }
1130 }
1131 };
1132
1133 let _global_permit = Arc::clone(global_sem)
1134 .acquire_owned()
1135 .await
1136 .map_err(|error| McpxError::Tls(format!("CRL global semaphore closed: {error}")))?;
1137 let _host_permit = host_sem
1138 .acquire_owned()
1139 .await
1140 .map_err(|error| McpxError::Tls(format!("CRL host semaphore closed: {error}")))?;
1141
1142 fetch_crl(client, url, allow_http, max_bytes).await
1143}
1144
1145async fn fetch_crl(
1146 client: &reqwest::Client,
1147 url: &str,
1148 allow_http: bool,
1149 max_bytes: u64,
1150) -> Result<CachedCrl, McpxError> {
1151 let parsed =
1152 Url::parse(url).map_err(|error| McpxError::Tls(format!("CRL URL parse {url}: {error}")))?;
1153
1154 if let Err(reason) = check_scheme(&parsed, allow_http) {
1155 tracing::warn!(url = %url, reason, "CRL fetch denied: scheme");
1156 return Err(McpxError::Tls(format!(
1157 "CRL scheme rejected ({reason}): {url}"
1158 )));
1159 }
1160
1161 let host = parsed
1162 .host_str()
1163 .ok_or_else(|| McpxError::Tls(format!("CRL URL has no host: {url}")))?;
1164 let port = parsed
1165 .port_or_known_default()
1166 .ok_or_else(|| McpxError::Tls(format!("CRL URL has no known port: {url}")))?;
1167
1168 let addrs = lookup_host((host, port))
1169 .await
1170 .map_err(|error| McpxError::Tls(format!("CRL DNS resolution {url}: {error}")))?;
1171
1172 let mut any_addr = false;
1173 for addr in addrs {
1174 any_addr = true;
1175 if let Some(reason) = ip_block_reason(addr.ip()) {
1176 tracing::warn!(
1177 url = %url,
1178 resolved_ip = %addr.ip(),
1179 reason,
1180 "CRL fetch denied: blocked IP"
1181 );
1182 return Err(McpxError::Tls(format!(
1183 "CRL host resolved to blocked IP ({reason}): {url}"
1184 )));
1185 }
1186 }
1187 if !any_addr {
1188 return Err(McpxError::Tls(format!(
1189 "CRL DNS resolution returned no addresses: {url}"
1190 )));
1191 }
1192
1193 let mut response = client
1194 .get(url)
1195 .send()
1196 .await
1197 .map_err(|error| McpxError::Tls(format!("CRL fetch {url}: {error}")))?
1198 .error_for_status()
1199 .map_err(|error| McpxError::Tls(format!("CRL fetch {url}: {error}")))?;
1200
1201 let initial_capacity = usize::try_from(max_bytes.min(64 * 1024)).unwrap_or(64 * 1024);
1204 let mut body: Vec<u8> = Vec::with_capacity(initial_capacity);
1205 while let Some(chunk) = response
1206 .chunk()
1207 .await
1208 .map_err(|error| McpxError::Tls(format!("CRL read {url}: {error}")))?
1209 {
1210 let chunk_len = u64::try_from(chunk.len()).unwrap_or(u64::MAX);
1211 let body_len = u64::try_from(body.len()).unwrap_or(u64::MAX);
1212 if body_len.saturating_add(chunk_len) > max_bytes {
1213 return Err(McpxError::Tls(format!(
1214 "CRL body exceeded cap of {max_bytes} bytes: {url}"
1215 )));
1216 }
1217 body.extend_from_slice(&chunk);
1218 }
1219
1220 let der = CertificateRevocationListDer::from(body);
1221 let (this_update, next_update) = parse_crl_metadata(der.as_ref())?;
1222
1223 Ok(CachedCrl {
1224 der,
1225 this_update,
1226 next_update,
1227 fetched_at: SystemTime::now(),
1228 source_url: url.to_owned(),
1229 })
1230}
1231
1232fn should_refresh_cached(
1233 cached: &CachedCrl,
1234 now: SystemTime,
1235 fixed_interval: Option<Duration>,
1236) -> bool {
1237 if let Some(interval) = fixed_interval {
1238 return cached
1239 .fetched_at
1240 .checked_add(clamp_refresh(interval))
1241 .is_none_or(|deadline| now >= deadline);
1242 }
1243
1244 cached
1245 .next_update
1246 .is_none_or(|next_update| now >= next_update)
1247}
1248
1249fn clamp_refresh(duration: Duration) -> Duration {
1250 duration.clamp(MIN_AUTO_REFRESH, MAX_AUTO_REFRESH)
1251}
1252
1253fn asn1_time_to_system_time(time: x509_parser::time::ASN1Time) -> SystemTime {
1254 let timestamp = time.timestamp();
1255 if timestamp >= 0 {
1256 let seconds = u64::try_from(timestamp).unwrap_or(0);
1257 UNIX_EPOCH + Duration::from_secs(seconds)
1258 } else {
1259 UNIX_EPOCH - Duration::from_secs(timestamp.unsigned_abs())
1260 }
1261}