jwks_cache/http/
client.rs1use std::marker::PhantomData;
5use http::{
7 HeaderMap, Request, Response, StatusCode,
8 header::{CACHE_CONTROL, ETAG, LAST_MODIFIED},
9};
10use jsonwebtoken::jwk::JwkSet;
11use reqwest::Client;
12use crate::{_prelude::*, registry::IdentityProviderRegistration, security};
14
15#[derive(Clone, Debug)]
17pub struct HttpExchange {
18 pub request: Request<()>,
20 pub response: Response<()>,
22 pub elapsed: Duration,
24 _body: PhantomData<()>,
26}
27impl HttpExchange {
28 pub fn new(request: Request<()>, response: Response<()>, elapsed: Duration) -> Self {
30 Self { request, response, elapsed, _body: PhantomData }
31 }
32
33 pub fn headers(&self) -> &HeaderMap {
35 self.response.headers()
36 }
37
38 pub fn status(&self) -> StatusCode {
40 self.response.status()
41 }
42}
43
44#[derive(Clone, Debug)]
46pub struct HttpFetch {
47 pub exchange: HttpExchange,
49 pub jwks: Option<Arc<JwkSet>>,
51 pub etag: Option<String>,
53 pub last_modified: Option<DateTime<Utc>>,
55}
56
57pub async fn fetch_jwks(
59 client: &Client,
60 registration: &IdentityProviderRegistration,
61 request: &Request<()>,
62 attempt_timeout: Duration,
63) -> Result<HttpFetch> {
64 if registration.require_https {
65 security::enforce_https(®istration.jwks_url)?;
66 }
67
68 let method = request.method().clone();
69 let mut builder = client.request(method, registration.jwks_url.clone());
70
71 for (name, value) in request.headers().iter() {
72 builder = builder.header(name, value);
73 }
74
75 builder = builder.timeout(attempt_timeout);
76
77 let start = Instant::now();
78 let response = builder.send().await?;
79 let elapsed = start.elapsed();
80 let status = response.status();
81 let headers = response.headers().clone();
82 let mut response_builder = Response::builder().status(status);
83
84 if let Some(existing) = response_builder.headers_mut() {
85 existing.extend(headers.iter().map(|(name, value)| (name.clone(), value.clone())));
86 }
87
88 let response_template = response_builder.body(()).map_err(Error::from)?;
89 let etag = response_template
90 .headers()
91 .get(ETAG)
92 .and_then(|value| value.to_str().ok())
93 .map(|s| s.to_string());
94 let last_modified = response_template
95 .headers()
96 .get(LAST_MODIFIED)
97 .and_then(|value| value.to_str().ok())
98 .and_then(|raw| httpdate::parse_http_date(raw).ok())
99 .map(DateTime::<Utc>::from);
100
101 if status == StatusCode::NOT_MODIFIED {
102 let exchange = HttpExchange::new(request.clone(), response_template, elapsed);
103
104 return Ok(HttpFetch { exchange, jwks: None, etag, last_modified });
105 }
106 if !status.is_success() {
107 let body = response.text().await.ok();
108
109 return Err(Error::HttpStatus { status, url: registration.jwks_url.clone(), body });
110 }
111
112 let bytes = response.bytes().await?;
113
114 if bytes.len() as u64 > registration.max_response_bytes {
115 return Err(Error::Validation {
116 field: "max_response_bytes",
117 reason: format!(
118 "Response size {size} bytes exceeds the configured guard of {limit} bytes.",
119 size = bytes.len(),
120 limit = registration.max_response_bytes
121 ),
122 });
123 }
124
125 let jwks: JwkSet = serde_json::from_slice(&bytes)?;
126 let exchange = HttpExchange::new(request.clone(), response_template, elapsed);
127
128 tracing::debug!(
129 tenant = %registration.tenant_id,
130 provider = %registration.provider_id,
131 status = %status,
132 elapsed = ?elapsed,
133 "jwks fetch complete"
134 );
135
136 Ok(HttpFetch { exchange, jwks: Some(Arc::new(jwks)), etag, last_modified })
137}
138
139pub fn cache_control_header(headers: &HeaderMap) -> Option<String> {
141 headers.get(CACHE_CONTROL).and_then(|value| value.to_str().ok()).map(|s| s.to_string())
142}