1use std::{
2 collections::{HashMap, HashSet},
3 sync::{
4 Arc,
5 RwLock,
6 Weak,
7 atomic::{AtomicBool, AtomicUsize, Ordering},
8 },
9 time::{SystemTime, UNIX_EPOCH},
10};
11
12use dcap_qvl::{QuoteCollateralV3, collateral::CollateralClient, tcb_info::TcbInfo};
13use thiserror::Error;
14use time::{OffsetDateTime, format_description::well_known::Rfc3339};
15use tokio::{
16 sync::{Semaphore, watch},
17 task::{JoinHandle, JoinSet},
18 time::{Duration, sleep},
19};
20use tracing::debug;
21use x509_parser::{prelude::FromDer, revocation_list::CertificateRevocationList};
22
23pub const PCS_URL: &str = "https://api.trustedservices.intel.com";
25const REFRESH_MARGIN_SECS: i64 = 300;
27const REFRESH_RETRY_SECS: u64 = 60;
29const STARTUP_PREWARM_CONCURRENCY: usize = 8;
32
33#[derive(Clone)]
35pub struct Pccs {
36 url: String,
38 cache: Arc<RwLock<HashMap<PccsInput, CacheEntry>>>,
40 pending_refreshes: Arc<RwLock<HashSet<PccsInput>>>,
42 prewarm_stats: Arc<PrewarmStats>,
44 prewarm_outcome_tx: Option<watch::Sender<Option<PrewarmOutcome>>>,
46}
47
48impl std::fmt::Debug for Pccs {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 f.debug_struct("Pccs").field("url", &self.url).finish_non_exhaustive()
53 }
54}
55
56impl Pccs {
57 pub fn new(url: Option<String>) -> Self {
59 let mut pccs = Self::new_without_prewarm(url);
60
61 let (prewarm_outcome_tx, _) = watch::channel(None);
62 pccs.prewarm_outcome_tx = Some(prewarm_outcome_tx);
63
64 let pccs_for_prewarm = pccs.clone();
66 tokio::spawn(async move {
67 let outcome = pccs_for_prewarm.startup_prewarm_all_tdx().await;
68 pccs_for_prewarm.finish_prewarm(outcome);
69 });
70
71 pccs
72 }
73
74 pub fn new_without_prewarm(url: Option<String>) -> Self {
77 let url = url
78 .unwrap_or(PCS_URL.to_string())
79 .trim_end_matches('/')
80 .trim_end_matches("/sgx/certification/v4")
81 .trim_end_matches("/tdx/certification/v4")
82 .to_string();
83
84 Self {
85 url,
86 cache: RwLock::new(HashMap::new()).into(),
87 pending_refreshes: RwLock::new(HashSet::new()).into(),
88 prewarm_stats: Arc::new(PrewarmStats::default()),
89 prewarm_outcome_tx: None,
90 }
91 }
92
93 pub async fn ready(&self) -> Result<PrewarmSummary, PccsError> {
95 if let Some(prewarm_outcome_tx) = &self.prewarm_outcome_tx {
96 let mut outcome_rx = prewarm_outcome_tx.subscribe();
97 loop {
98 if let Some(outcome) = outcome_rx.borrow_and_update().clone() {
99 return match outcome {
100 PrewarmOutcome::Ready(summary) => Ok(summary),
101 PrewarmOutcome::Failed(message) => Err(PccsError::PrewarmFailed(message)),
102 };
103 }
104 if outcome_rx.changed().await.is_err() {
105 return Err(PccsError::PrewarmSignalClosed);
106 }
107 }
108 } else {
109 Err(PccsError::PrewarmDisabled)
110 }
111 }
112
113 pub async fn get_collateral(
118 &self,
119 fmspc: String,
120 ca: &'static str,
121 now: u64,
122 ) -> Result<(QuoteCollateralV3, bool), PccsError> {
123 let now = i64::try_from(now).map_err(|_| PccsError::TimeStampExceedsI64)?;
124 let cache_key = PccsInput::new(fmspc.clone(), ca);
125
126 {
127 let cache = self.cache.read().map_err(|_| PccsError::CachePoisoned)?;
128 if let Some(entry) = cache.get(&cache_key) {
129 if now < entry.next_update {
130 return Ok((entry.collateral.clone(), false));
131 }
132 tracing::warn!(
133 fmspc,
134 next_update = entry.next_update,
135 now,
136 "Cached collateral expired, refreshing from PCCS"
137 );
138 }
139 }
140
141 let collateral = fetch_collateral(&self.url, fmspc.clone(), ca).await?;
142 let next_update = extract_next_update(&collateral, now)?;
143
144 {
145 let mut cache = self.cache.write().map_err(|_| PccsError::CachePoisoned)?;
146 if let Some(existing) = cache.get(&cache_key) &&
147 now < existing.next_update
148 {
149 return Ok((existing.collateral.clone(), false));
150 }
151
152 upsert_cache_entry(&mut cache, cache_key.clone(), collateral.clone(), next_update);
153 }
154 self.ensure_refresh_task(&cache_key).await;
155 Ok((collateral, true))
156 }
157
158 pub fn get_collateral_sync(
167 &self,
168 fmspc: String,
169 ca: &'static str,
170 now: u64,
171 ) -> Result<QuoteCollateralV3, PccsError> {
172 let now = i64::try_from(now).map_err(|_| PccsError::TimeStampExceedsI64)?;
173 let cache_key = PccsInput::new(fmspc.clone(), ca);
174 let cache = self.cache.read().map_err(|_| PccsError::CachePoisoned)?;
175 if let Some(entry) = cache.get(&cache_key) {
176 if now >= entry.next_update {
177 let collateral = entry.collateral.clone();
178 tracing::warn!(
179 fmspc,
180 next_update = entry.next_update,
181 now,
182 "Cached collateral expired"
183 );
184 drop(cache);
185
186 let pccs = self.clone();
188 tokio::spawn(async move {
189 pccs.ensure_refresh_task(&cache_key).await;
190 });
191
192 return Ok(collateral);
193 }
194 Ok(entry.collateral.clone())
195 } else {
196 drop(cache);
197 self.spawn_background_refresh_for_cache_miss(cache_key.clone());
198 Err(PccsError::NoCollateralForFmspc(format!("{cache_key:?}")))
199 }
200 }
201
202 async fn refresh_collateral(
205 &self,
206 fmspc: String,
207 ca: &'static str,
208 ) -> Result<QuoteCollateralV3, PccsError> {
209 let now = unix_now()?;
210 let collateral = fetch_collateral(&self.url, fmspc.clone(), ca).await?;
211 let next_update = extract_next_update(&collateral, now)?;
212 let cache_key = PccsInput::new(fmspc, ca);
213
214 {
215 let mut cache = self.cache.write().map_err(|_| PccsError::CachePoisoned)?;
216 upsert_cache_entry(&mut cache, cache_key.clone(), collateral.clone(), next_update);
217 }
218 self.ensure_refresh_task(&cache_key).await;
219 Ok(collateral)
220 }
221
222 #[allow(clippy::unused_async)]
225 async fn ensure_refresh_task(&self, cache_key: &PccsInput) {
226 let Ok(mut cache) = self.cache.write() else {
227 tracing::warn!("PCCS cache lock poisoned, cannot ensure refresh task");
228 return;
229 };
230 let Some(entry) = cache.get_mut(cache_key) else {
231 return;
232 };
233 if entry.refresh_task.is_some() {
234 return;
235 }
236
237 let weak_cache = Arc::downgrade(&self.cache);
238 let key = cache_key.clone();
239 let url = self.url.clone();
240 entry.refresh_task = Some(tokio::spawn(async move {
241 refresh_loop(weak_cache, url, key).await;
242 }));
243 }
244
245 fn spawn_background_refresh_for_cache_miss(&self, cache_key: PccsInput) {
247 {
248 let Ok(mut pending_refreshes) = self.pending_refreshes.write() else {
249 tracing::warn!("PCCS pending-refresh lock poisoned, cannot start sync refresh");
250 return;
251 };
252 if !pending_refreshes.insert(cache_key.clone()) {
253 return;
254 }
255 }
256
257 let pccs = self.clone();
258 tokio::spawn(async move {
259 let result = pccs
260 .refresh_collateral(
261 cache_key.fmspc.clone(),
262 ca_as_static(&cache_key.ca).expect("unsupported CA in pending refresh"),
263 )
264 .await;
265
266 if let Err(err) = result {
267 tracing::warn!(
268 fmspc = cache_key.fmspc,
269 ca = cache_key.ca,
270 error = %err,
271 "Sync-triggered PCCS cache repair failed"
272 );
273 }
274
275 if let Ok(mut pending_refreshes) = pccs.pending_refreshes.write() {
278 pending_refreshes.remove(&cache_key);
279 } else {
280 tracing::warn!("PCCS pending-refresh lock poisoned during cleanup");
281 }
282 });
283 }
284
285 async fn startup_prewarm_all_tdx(&self) -> PrewarmOutcome {
288 let fmspcs = match self.fetch_fmspcs().await {
290 Ok(fmspcs) => fmspcs,
291 Err(e) => {
292 tracing::warn!(error = %e, "Failed to fetch FMSPC list for startup pre-provision");
293 return PrewarmOutcome::Failed(format!(
294 "Failed to fetch FMSPC list for prewarm: {e}"
295 ));
296 }
297 };
298 self.prewarm_stats.discovered_fmspcs.store(fmspcs.len(), Ordering::SeqCst);
299
300 if fmspcs.is_empty() {
301 tracing::warn!("No FMSPC entries returned during startup pre-provision");
302 return PrewarmOutcome::Ready(self.prewarm_stats.snapshot());
303 }
304
305 let semaphore = Arc::new(Semaphore::new(STARTUP_PREWARM_CONCURRENCY));
308 let mut join_set = JoinSet::new();
309 for entry in fmspcs {
310 for ca in ["processor", "platform"] {
311 let permit = semaphore.clone().acquire_owned().await;
312 let Ok(permit) = permit else {
313 continue;
314 };
315 self.prewarm_stats.attempted.fetch_add(1, Ordering::SeqCst);
316 let pccs = self.clone();
317 let fmspc = entry.fmspc.clone();
318 join_set.spawn(async move {
319 let _permit = permit;
320 let result = pccs.refresh_collateral(fmspc.clone(), ca).await;
321 Ok::<(String, &'static str, Result<(), PccsError>), PccsError>((
322 fmspc,
323 ca,
324 result.map(|_| ()),
325 ))
326 });
327 }
328 }
329
330 let mut successes = 0usize;
332 let mut failures = 0usize;
333 while let Some(task_result) = join_set.join_next().await {
334 match task_result {
335 Ok(Ok((fmspc, ca, Ok(())))) => {
336 successes += 1;
337 debug!("Successfully cached: {fmspc} {ca}");
338 self.prewarm_stats.successes.fetch_add(1, Ordering::SeqCst);
339 }
340 Ok(Ok((fmspc, ca, Err(e)))) => {
341 failures += 1;
342 self.prewarm_stats.failures.fetch_add(1, Ordering::SeqCst);
343 tracing::debug!(
344 fmspc,
345 ca,
346 error = %e,
347 "Startup pre-provision: FMSPC/CA not cached:"
348 );
349 }
350 Ok(Err(e)) => {
351 failures += 1;
352 self.prewarm_stats.failures.fetch_add(1, Ordering::SeqCst);
353 tracing::debug!(error = %e, "Startup pre-provision task failed");
354 }
355 Err(e) => {
356 failures += 1;
357 self.prewarm_stats.failures.fetch_add(1, Ordering::SeqCst);
358 tracing::debug!(error = %e, "Startup pre-provision join error");
359 }
360 }
361 }
362 tracing::info!(
363 discovered_fmspcs = self.prewarm_stats.discovered_fmspcs.load(Ordering::SeqCst),
364 attempted = self.prewarm_stats.attempted.load(Ordering::SeqCst),
365 successes,
366 failures,
367 "Completed PCCS startup pre-provisioning for TDX collateral"
368 );
369 PrewarmOutcome::Ready(self.prewarm_stats.snapshot())
370 }
371
372 fn finish_prewarm(&self, outcome: PrewarmOutcome) {
373 if let Some(prewarm_outcome_tx) = &self.prewarm_outcome_tx {
374 self.prewarm_stats.completed.store(true, Ordering::SeqCst);
375 let _ = prewarm_outcome_tx.send(Some(outcome));
376 }
377 }
378
379 async fn fetch_fmspcs(&self) -> Result<Vec<FmspcEntry>, PccsError> {
381 let url = format!("{}/sgx/certification/v4/fmspcs", self.url);
382 let client = reqwest::Client::builder().timeout(Duration::from_secs(15)).build()?;
383 let response = client.get(&url).send().await?;
384 if !response.status().is_success() {
385 return Err(PccsError::FmspcFetch(response.status()));
386 }
387 let body = response.text().await?;
388 let entries: Vec<FmspcEntry> = serde_json::from_str(&body)?;
389 Ok(entries)
390 }
391}
392
393#[derive(Clone, Debug, PartialEq, Eq)]
395pub struct PrewarmSummary {
396 pub discovered_fmspcs: usize,
397 pub attempted: usize,
398 pub successes: usize,
399 pub failures: usize,
400}
401
402#[derive(Clone, Debug)]
403enum PrewarmOutcome {
404 Ready(PrewarmSummary),
405 Failed(String),
406}
407
408#[derive(Clone, Debug, Hash, PartialEq, Eq)]
410struct PccsInput {
411 fmspc: String,
412 ca: String,
413}
414
415impl PccsInput {
416 fn new(fmspc: String, ca: &'static str) -> Self {
418 Self { fmspc, ca: ca.to_string() }
419 }
420}
421
422async fn fetch_collateral(
424 url: &str,
425 fmspc: String,
426 ca: &'static str,
427) -> Result<QuoteCollateralV3, PccsError> {
428 CollateralClient::with_default_http(url)?
429 .fetch_for_fmspc_without_pck_chain(&fmspc, ca, false)
430 .await
431 .map_err(Into::into)
432}
433
434fn extract_next_update(collateral: &QuoteCollateralV3, now: i64) -> Result<i64, PccsError> {
442 let tcb_info: TcbInfo = serde_json::from_str(&collateral.tcb_info).map_err(|e| {
443 PccsError::PccsCollateralParse(format!("Failed to parse TCB info JSON: {e}"))
444 })?;
445 let qe_identity: QeIdentityNextUpdate =
446 serde_json::from_str(&collateral.qe_identity).map_err(|e| {
447 PccsError::PccsCollateralParse(format!("Failed to parse QE identity JSON: {e}"))
448 })?;
449
450 let tcb_next_update = parse_next_update("tcb_info.nextUpdate", &tcb_info.next_update)?;
451 let qe_next_update = parse_next_update("qe_identity.nextUpdate", &qe_identity.next_update)?;
452 let root_ca_crl_next_update =
453 parse_crl_next_update("root_ca_crl.nextUpdate", &collateral.root_ca_crl)?;
454 let pck_crl_next_update = parse_crl_next_update("pck_crl.nextUpdate", &collateral.pck_crl)?;
455 let next_update =
456 tcb_next_update.min(qe_next_update).min(root_ca_crl_next_update).min(pck_crl_next_update);
457
458 if now >= next_update {
459 return Err(PccsError::PccsCollateralExpired(format!(
460 "Collateral expired (tcb_next_update={}, qe_next_update={}, root_ca_crl_next_update={}, pck_crl_next_update={}, now={now})",
461 tcb_info.next_update,
462 qe_identity.next_update,
463 root_ca_crl_next_update,
464 pck_crl_next_update
465 )));
466 }
467
468 Ok(next_update)
469}
470
471fn parse_next_update(field: &str, value: &str) -> Result<i64, PccsError> {
473 OffsetDateTime::parse(value, &Rfc3339)
474 .map_err(|e| {
475 PccsError::PccsCollateralParse(format!("Failed to parse {field} as RFC3339: {e}"))
476 })
477 .map(|parsed| parsed.unix_timestamp())
478}
479
480fn parse_crl_next_update(field: &str, crl_der: &[u8]) -> Result<i64, PccsError> {
483 let (_, crl) = CertificateRevocationList::from_der(crl_der).map_err(|e| {
484 PccsError::PccsCollateralParse(format!("Failed to parse {field} as DER CRL: {e}"))
485 })?;
486 let next_update = crl
487 .next_update()
488 .ok_or_else(|| PccsError::PccsCollateralParse(format!("Missing {field} in DER CRL")))?;
489 Ok(next_update.timestamp())
490}
491
492fn unix_now() -> Result<i64, PccsError> {
494 Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64)
495}
496
497fn refresh_sleep_seconds(next_update: i64, now: i64) -> u64 {
499 let refresh_at = next_update - REFRESH_MARGIN_SECS;
500 if refresh_at <= now { 0 } else { (refresh_at - now) as u64 }
501}
502
503fn upsert_cache_entry(
506 cache: &mut HashMap<PccsInput, CacheEntry>,
507 key: PccsInput,
508 collateral: QuoteCollateralV3,
509 next_update: i64,
510) {
511 match cache.get_mut(&key) {
512 Some(existing) => {
513 existing.collateral = collateral;
514 existing.next_update = next_update;
515 }
516 None => {
517 cache.insert(key, CacheEntry { collateral, next_update, refresh_task: None });
518 }
519 }
520}
521
522fn ca_as_static(ca: &str) -> Option<&'static str> {
524 match ca {
525 "processor" => Some("processor"),
526 "platform" => Some("platform"),
527 _ => None,
528 }
529}
530
531async fn refresh_loop(
533 weak_cache: Weak<RwLock<HashMap<PccsInput, CacheEntry>>>,
534 pccs_url: String,
535 key: PccsInput,
536) {
537 let Some(ca_static) = ca_as_static(&key.ca) else {
538 tracing::warn!(ca = key.ca, "Unsupported collateral CA value, refresh loop stopping");
539 return;
540 };
541
542 loop {
543 let Some(cache) = weak_cache.upgrade() else {
544 return;
545 };
546 let next_update = {
547 let Ok(cache_guard) = cache.read() else {
548 tracing::warn!("PCCS cache lock poisoned, refresh loop stopping");
549 return;
550 };
551 let Some(entry) = cache_guard.get(&key) else {
552 return;
553 };
554 entry.next_update
555 };
556
557 let now = match unix_now() {
559 Ok(now) => now,
560 Err(e) => {
561 tracing::warn!(error = %e, "Failed to read system time for PCCS refresh");
562 sleep(Duration::from_secs(REFRESH_RETRY_SECS)).await;
563 continue;
564 }
565 };
566 let sleep_secs = refresh_sleep_seconds(next_update, now);
567 sleep(Duration::from_secs(sleep_secs)).await;
568
569 let now = match unix_now() {
571 Ok(now) => now,
572 Err(e) => {
573 tracing::warn!(error = %e, "Failed to read system time for PCCS refresh");
574 sleep(Duration::from_secs(REFRESH_RETRY_SECS)).await;
575 continue;
576 }
577 };
578 let Some(cache) = weak_cache.upgrade() else {
579 return;
580 };
581 let should_refresh = {
582 let Ok(cache_guard) = cache.read() else {
583 tracing::warn!("PCCS cache lock poisoned, refresh loop stopping");
584 return;
585 };
586 let Some(entry) = cache_guard.get(&key) else {
587 return;
588 };
589 refresh_sleep_seconds(entry.next_update, now) == 0
590 };
591 if !should_refresh {
592 continue;
594 }
595
596 match fetch_collateral(&pccs_url, key.fmspc.clone(), ca_static).await {
597 Ok(collateral) => {
598 let validate_now = match unix_now() {
599 Ok(timestamp) => timestamp,
600 Err(e) => {
601 tracing::warn!(
602 error = %e,
603 "Failed to read system time for PCCS refresh validation"
604 );
605 sleep(Duration::from_secs(REFRESH_RETRY_SECS)).await;
606 continue;
607 }
608 };
609 match extract_next_update(&collateral, validate_now) {
610 Ok(new_next_update) => {
611 let Some(cache) = weak_cache.upgrade() else {
612 return;
613 };
614 let Ok(mut cache_guard) = cache.write() else {
615 tracing::warn!("PCCS cache lock poisoned, refresh loop stopping");
616 return;
617 };
618 let Some(entry) = cache_guard.get_mut(&key) else {
619 return;
620 };
621 entry.collateral = collateral;
622 entry.next_update = new_next_update;
623 tracing::debug!(
624 fmspc = key.fmspc,
625 ca = key.ca,
626 next_update = new_next_update,
627 "Refreshed PCCS collateral in background"
628 );
629 }
630 Err(e) => {
631 tracing::warn!(
632 fmspc = key.fmspc,
633 ca = key.ca,
634 error = %e,
635 "Fetched PCCS collateral but nextUpdate validation failed"
636 );
637 sleep(Duration::from_secs(REFRESH_RETRY_SECS)).await;
638 }
639 }
640 }
641 Err(e) => {
642 tracing::warn!(
643 fmspc = key.fmspc,
644 ca = key.ca,
645 error = %e,
646 "Background PCCS collateral refresh failed"
647 );
648 sleep(Duration::from_secs(REFRESH_RETRY_SECS)).await;
649 }
650 }
651 }
652}
653
654struct CacheEntry {
656 collateral: QuoteCollateralV3,
657 next_update: i64,
658 refresh_task: Option<JoinHandle<()>>,
659}
660
661#[derive(serde::Deserialize)]
663#[serde(rename_all = "camelCase")]
664struct QeIdentityNextUpdate {
665 next_update: String,
666}
667
668#[derive(Debug, serde::Deserialize)]
669struct FmspcEntry {
670 fmspc: String,
671 #[allow(dead_code)]
672 platform: String,
673}
674
675#[derive(Default)]
676struct PrewarmStats {
677 discovered_fmspcs: AtomicUsize,
678 attempted: AtomicUsize,
679 successes: AtomicUsize,
680 failures: AtomicUsize,
681 completed: AtomicBool,
682}
683
684impl PrewarmStats {
685 fn snapshot(&self) -> PrewarmSummary {
686 PrewarmSummary {
687 discovered_fmspcs: self.discovered_fmspcs.load(Ordering::SeqCst),
688 attempted: self.attempted.load(Ordering::SeqCst),
689 successes: self.successes.load(Ordering::SeqCst),
690 failures: self.failures.load(Ordering::SeqCst),
691 }
692 }
693}
694
695#[derive(Error, Debug)]
696pub enum PccsError {
697 #[error("DCAP quote verification: {0}")]
698 DcapQvl(#[from] anyhow::Error),
699 #[error("PCCS collateral parse error: {0}")]
700 PccsCollateralParse(String),
701 #[error("PCCS collateral expired: {0}")]
702 PccsCollateralExpired(String),
703 #[error("System Time: {0}")]
704 SystemTime(#[from] std::time::SystemTimeError),
705 #[error("HTTP client: {0}")]
706 Reqwest(#[from] reqwest::Error),
707 #[error("Failed to fetch FMSPC: {0}")]
708 FmspcFetch(reqwest::StatusCode),
709 #[error("JSON: {0}")]
710 Json(#[from] serde_json::Error),
711 #[error("PCCS prewarm failed: {0}")]
712 PrewarmFailed(String),
713 #[error("PCCS prewarm signal channel closed before completion")]
714 PrewarmSignalClosed,
715 #[error("PCCS prewarm is disabled for this instance")]
716 PrewarmDisabled,
717 #[error("Timestamp exceeds i64 range")]
718 TimeStampExceedsI64,
719 #[error("PCCS cache lock poisoned")]
720 CachePoisoned,
721 #[error("No collateral in cache for FMSPC {0}")]
722 NoCollateralForFmspc(String),
723}
724
725#[cfg(test)]
726mod mock_pcs;
727
728#[cfg(test)]
729mod tests;