1#![allow(missing_docs)] use std::collections::HashSet;
4use std::fmt::Display;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::Duration;
8
9use apollo_compiler::validation::Valid;
10use http::StatusCode;
11use http::Version;
12use http::header::CACHE_CONTROL;
13use multimap::MultiMap;
14use serde::Deserialize;
15use serde::Serialize;
16use serde_json_bytes::ByteString;
17use serde_json_bytes::Map as JsonMap;
18use serde_json_bytes::Value;
19use sha2::Digest;
20use sha2::Sha256;
21use static_assertions::assert_impl_all;
22use tokio::sync::broadcast;
23use tokio::sync::mpsc;
24use tokio_stream::Stream;
25use tower::BoxError;
26
27use crate::Context;
28use crate::batching::BatchQuery;
29use crate::error::Error;
30use crate::graphql;
31use crate::http_ext::TryIntoHeaderName;
32use crate::http_ext::TryIntoHeaderValue;
33use crate::http_ext::header_map;
34use crate::json_ext::Object;
35use crate::json_ext::Path;
36use crate::plugins::authentication::APOLLO_AUTHENTICATION_JWT_CLAIMS;
37use crate::plugins::authentication::subgraph::SigningParamsConfig;
38use crate::plugins::authorization::CacheKeyMetadata;
39use crate::plugins::response_cache::cache_control::CacheControl;
40use crate::query_planner::fetch::OperationKind;
41use crate::spec::QueryHash;
42
43pub type BoxService = tower::util::BoxService<Request, Response, BoxError>;
44pub type BoxCloneService = tower::util::BoxCloneService<Request, Response, BoxError>;
45pub type ServiceResult = Result<Response, BoxError>;
46pub(crate) type BoxGqlStream = Pin<Box<dyn Stream<Item = graphql::Response> + Send + Sync>>;
47#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
49pub struct SubgraphRequestId(pub String);
50
51impl Display for SubgraphRequestId {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 write!(f, "{}", self.0)
54 }
55}
56
57assert_impl_all!(Request: Send);
58#[non_exhaustive]
59pub struct Request {
60 pub supergraph_request: Arc<http::Request<graphql::Request>>,
62
63 pub subgraph_request: http::Request<graphql::Request>,
64
65 pub operation_kind: OperationKind,
66
67 pub context: Context,
68
69 pub(crate) subgraph_name: String,
71 pub(crate) subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
73 pub(crate) connection_closed_signal: Option<broadcast::Receiver<()>>,
75
76 pub(crate) query_hash: Arc<QueryHash>,
77
78 pub(crate) authorization: Arc<CacheKeyMetadata>,
80
81 pub(crate) executable_document: Option<Arc<Valid<apollo_compiler::ExecutableDocument>>>,
82
83 pub(crate) id: SubgraphRequestId,
85}
86
87#[buildstructor::buildstructor]
88impl Request {
89 #[builder(visibility = "pub")]
93 fn new(
94 supergraph_request: Arc<http::Request<graphql::Request>>,
95 subgraph_request: http::Request<graphql::Request>,
96 operation_kind: OperationKind,
97 context: Context,
98 subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
99 subgraph_name: String,
100 connection_closed_signal: Option<broadcast::Receiver<()>>,
101 executable_document: Option<Arc<Valid<apollo_compiler::ExecutableDocument>>>,
102 ) -> Request {
103 Self {
104 supergraph_request,
105 subgraph_request,
106 operation_kind,
107 context,
108 subgraph_name,
109 subscription_stream,
110 connection_closed_signal,
111 query_hash: QueryHash::default().into(),
115 authorization: Default::default(),
116 executable_document,
117 id: SubgraphRequestId::new(),
118 }
119 }
120
121 #[builder(visibility = "pub")]
127 fn fake_new(
128 supergraph_request: Option<Arc<http::Request<graphql::Request>>>,
129 subgraph_request: Option<http::Request<graphql::Request>>,
130 operation_kind: Option<OperationKind>,
131 context: Option<Context>,
132 subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
133 subgraph_name: Option<String>,
134 connection_closed_signal: Option<broadcast::Receiver<()>>,
135 ) -> Request {
136 Request::new(
137 supergraph_request.unwrap_or_default(),
138 subgraph_request.unwrap_or_default(),
139 operation_kind.unwrap_or(OperationKind::Query),
140 context.unwrap_or_default(),
141 subscription_stream,
142 subgraph_name.unwrap_or_default(),
143 connection_closed_signal,
144 None,
145 )
146 }
147
148 pub(crate) fn is_part_of_batch(&self) -> bool {
149 self.context
150 .extensions()
151 .with_lock(|lock| lock.contains_key::<BatchQuery>())
152 }
153
154 pub(crate) fn subgraph_operation_name(&self) -> Option<&str> {
155 self.subgraph_request.body().operation_name.as_deref()
156 }
157
158 pub(crate) fn root_operation_fields(&self) -> Vec<String> {
159 self.executable_document
160 .as_ref()
161 .and_then(|executable_document| {
162 let operation_name = self.subgraph_operation_name();
163 Some(
164 executable_document
165 .operations
166 .get(operation_name)
167 .ok()?
168 .root_fields(executable_document)
169 .map(|f| f.name.to_string())
170 .collect(),
171 )
172 })
173 .unwrap_or_default()
174 }
175}
176
177impl Clone for Request {
178 fn clone(&self) -> Self {
179 let mut builder = http::Request::builder()
181 .method(self.subgraph_request.method())
182 .version(self.subgraph_request.version())
183 .uri(self.subgraph_request.uri());
184
185 {
186 let headers = builder.headers_mut().unwrap();
187 headers.extend(
188 self.subgraph_request
189 .headers()
190 .iter()
191 .map(|(name, value)| (name.clone(), value.clone())),
192 );
193 }
194 let mut subgraph_request = builder.body(self.subgraph_request.body().clone()).unwrap();
195 if let Some(signing_params) = self
203 .subgraph_request
204 .extensions()
205 .get::<Arc<SigningParamsConfig>>()
206 .cloned()
207 {
208 subgraph_request.extensions_mut().insert(signing_params);
209 }
210
211 Self {
212 supergraph_request: self.supergraph_request.clone(),
213 subgraph_request,
214 operation_kind: self.operation_kind,
215 context: self.context.clone(),
216 subgraph_name: self.subgraph_name.clone(),
217 subscription_stream: self.subscription_stream.clone(),
218 connection_closed_signal: self
219 .connection_closed_signal
220 .as_ref()
221 .map(|s| s.resubscribe()),
222 query_hash: self.query_hash.clone(),
223 authorization: self.authorization.clone(),
224 executable_document: self.executable_document.clone(),
225 id: self.id.clone(),
226 }
227 }
228}
229
230impl SubgraphRequestId {
231 pub fn new() -> Self {
232 SubgraphRequestId(
233 uuid::Uuid::new_v4()
234 .as_hyphenated()
235 .encode_lower(&mut uuid::Uuid::encode_buffer())
236 .to_string(),
237 )
238 }
239}
240
241impl std::ops::Deref for SubgraphRequestId {
242 type Target = str;
243
244 fn deref(&self) -> &str {
245 &self.0
246 }
247}
248
249impl Default for SubgraphRequestId {
250 fn default() -> Self {
251 Self::new()
252 }
253}
254
255assert_impl_all!(Response: Send);
256#[derive(Debug)]
257#[non_exhaustive]
258pub struct Response {
259 pub response: http::Response<graphql::Response>,
260 pub(crate) subgraph_name: String,
262 pub context: Context,
263 pub(crate) id: SubgraphRequestId,
265}
266
267#[buildstructor::buildstructor]
268impl Response {
269 pub(crate) fn new_from_response(
274 response: http::Response<graphql::Response>,
275 context: Context,
276 subgraph_name: String,
277 id: SubgraphRequestId,
278 ) -> Self {
279 Self {
280 response,
281 context,
282 subgraph_name,
283 id,
284 }
285 }
286
287 #[builder(visibility = "pub")]
292 fn new(
293 label: Option<String>,
294 data: Option<Value>,
295 path: Option<Path>,
296 errors: Vec<Error>,
297 extensions: Object,
298 status_code: Option<StatusCode>,
299 context: Context,
300 headers: Option<http::HeaderMap<http::HeaderValue>>,
301 subgraph_name: String,
302 id: Option<SubgraphRequestId>,
303 ) -> Self {
304 let res = graphql::Response::builder()
306 .and_label(label)
307 .data(data.unwrap_or_default())
308 .and_path(path)
309 .errors(errors)
310 .extensions(extensions)
311 .build();
312
313 let mut response = http::Response::builder()
315 .status(status_code.unwrap_or(StatusCode::OK))
316 .body(res)
317 .expect("Response is serializable; qed");
318
319 *response.headers_mut() = headers.unwrap_or_default();
320
321 let id = id.unwrap_or_default();
325
326 Self {
327 response,
328 context,
329 subgraph_name,
330 id,
331 }
332 }
333
334 #[builder(visibility = "pub")]
340 fn fake_new(
341 label: Option<String>,
342 data: Option<Value>,
343 path: Option<Path>,
344 errors: Vec<Error>,
345 extensions: JsonMap<ByteString, Value>,
347 status_code: Option<StatusCode>,
348 context: Option<Context>,
349 headers: Option<http::HeaderMap<http::HeaderValue>>,
350 subgraph_name: Option<String>,
351 id: Option<SubgraphRequestId>,
352 ) -> Self {
353 Self::new(
354 label,
355 data,
356 path,
357 errors,
358 extensions,
359 status_code,
360 context.unwrap_or_default(),
361 headers,
362 subgraph_name.unwrap_or_default(),
363 id,
364 )
365 }
366
367 #[builder(visibility = "pub")]
374 fn fake2_new(
375 label: Option<String>,
376 data: Option<Value>,
377 path: Option<Path>,
378 errors: Vec<Error>,
379 extensions: JsonMap<ByteString, Value>,
381 status_code: Option<StatusCode>,
382 context: Option<Context>,
383 headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
384 subgraph_name: Option<String>,
385 id: Option<SubgraphRequestId>,
386 ) -> Result<Response, BoxError> {
387 Ok(Self::new(
388 label,
389 data,
390 path,
391 errors,
392 extensions,
393 status_code,
394 context.unwrap_or_default(),
395 Some(header_map(headers)?),
396 subgraph_name.unwrap_or_default(),
397 id,
398 ))
399 }
400
401 #[builder(visibility = "pub")]
405 fn error_new(
406 errors: Vec<Error>,
407 status_code: Option<StatusCode>,
408 context: Context,
409 subgraph_name: String,
410 id: Option<SubgraphRequestId>,
411 ) -> Self {
412 Self::new(
413 Default::default(),
414 Default::default(),
415 Default::default(),
416 errors,
417 Default::default(),
418 status_code,
419 context,
420 Default::default(),
421 subgraph_name,
422 id,
423 )
424 }
425
426 pub(crate) fn subgraph_cache_control(
427 &self,
428 default_ttl: Option<Duration>,
429 ) -> Result<CacheControl, BoxError> {
430 if self.response.headers().contains_key(&CACHE_CONTROL) {
431 CacheControl::new(self.response.headers(), default_ttl)
432 } else {
433 Ok(CacheControl::no_store())
434 }
435 }
436
437 pub(crate) fn get_from_extensions(&self, key: &str) -> Option<&Value> {
438 self.response.body().extensions.get(key)
439 }
440}
441
442impl Request {
443 pub(crate) fn to_sha256(
444 &self,
445 ignored_headers: &HashSet<String>,
446 ignore_auth_context: bool,
447 ) -> String {
448 let mut hasher = Sha256::new();
449 let http_req = &self.subgraph_request;
450 hasher.update(http_req.method().as_str().as_bytes());
451
452 let version = match http_req.version() {
454 Version::HTTP_09 => "HTTP/0.9",
455 Version::HTTP_10 => "HTTP/1.0",
456 Version::HTTP_11 => "HTTP/1.1",
457 Version::HTTP_2 => "HTTP/2.0",
458 Version::HTTP_3 => "HTTP/3.0",
459 _ => "unknown",
460 };
461 hasher.update(version.as_bytes());
462 let uri = http_req.uri();
463 if let Some(scheme) = uri.scheme() {
464 hasher.update(scheme.as_str().as_bytes());
465 }
466 if let Some(authority) = uri.authority() {
467 hasher.update(authority.as_str().as_bytes());
468 }
469 if let Some(query) = uri.query() {
470 hasher.update(query.as_bytes());
471 }
472
473 for (name, value) in http_req
475 .headers()
476 .iter()
477 .filter(|(name, _)| !ignored_headers.contains(name.as_str()))
478 {
479 hasher.update(name.as_str().as_bytes());
480 hasher.update(value.to_str().unwrap_or("ERROR").as_bytes());
481 }
482 if !ignore_auth_context
483 && let Some(claim) = self
484 .context
485 .get_json_value(APOLLO_AUTHENTICATION_JWT_CLAIMS)
486 {
487 hasher.update(format!("{claim:?}").as_bytes());
488 }
489 let body = http_req.body();
490 if let Some(operation_name) = &body.operation_name {
491 hasher.update(operation_name.as_bytes());
492 }
493 if let Some(query) = &body.query {
494 hasher.update(query.as_bytes());
495 }
496 for (var_name, var_value) in &body.variables {
497 hasher.update(var_name.inner());
498 hasher.update(var_value.to_bytes());
499 }
500 for (name, val) in &body.extensions {
501 hasher.update(name.inner());
502 hasher.update(val.to_bytes());
503 }
504
505 hex::encode(hasher.finalize())
506 }
507}
508
509#[cfg(test)]
510mod tests {
511 use super::*;
512
513 #[test]
514 fn test_subgraph_request_hash() {
515 let subgraph_req_1 = Request::fake_builder()
516 .subgraph_request(
517 http::Request::builder()
518 .header("public_header", "value")
519 .header("auth", "my_token")
520 .body(graphql::Request::default())
521 .unwrap(),
522 )
523 .build();
524 let subgraph_req_2 = Request::fake_builder()
525 .subgraph_request(
526 http::Request::builder()
527 .header("public_header", "value_bis")
528 .header("auth", "my_token")
529 .body(graphql::Request::default())
530 .unwrap(),
531 )
532 .build();
533 let mut ignored_headers = HashSet::new();
534 ignored_headers.insert("public_header".to_string());
535 assert_eq!(
536 subgraph_req_1.to_sha256(&ignored_headers, false),
537 subgraph_req_2.to_sha256(&ignored_headers, false)
538 );
539
540 let subgraph_req_1 = Request::fake_builder()
541 .subgraph_request(
542 http::Request::builder()
543 .header("public_header", "value")
544 .header("auth", "my_token")
545 .body(graphql::Request::default())
546 .unwrap(),
547 )
548 .build();
549 let subgraph_req_2 = Request::fake_builder()
550 .subgraph_request(
551 http::Request::builder()
552 .header("public_header", "value_bis")
553 .header("auth", "my_token")
554 .body(graphql::Request::default())
555 .unwrap(),
556 )
557 .build();
558 let ignored_headers = HashSet::new();
559 assert_ne!(
560 subgraph_req_1.to_sha256(&ignored_headers, false),
561 subgraph_req_2.to_sha256(&ignored_headers, false)
562 );
563 }
564
565 #[test]
566 fn test_subgraph_request_hash_ignore_auth_context() {
567 use serde_json_bytes::json;
568
569 let req_with_claims_a = Request::fake_builder()
571 .subgraph_request(
572 http::Request::builder()
573 .body(graphql::Request::default())
574 .unwrap(),
575 )
576 .build();
577 req_with_claims_a
578 .context
579 .insert(APOLLO_AUTHENTICATION_JWT_CLAIMS, json!({"sub": "user-a"}))
580 .expect("insert JWT claims");
581
582 let req_with_claims_b = Request::fake_builder()
583 .subgraph_request(
584 http::Request::builder()
585 .body(graphql::Request::default())
586 .unwrap(),
587 )
588 .build();
589 req_with_claims_b
590 .context
591 .insert(APOLLO_AUTHENTICATION_JWT_CLAIMS, json!({"sub": "user-b"}))
592 .expect("insert JWT claims");
593
594 let ignored_headers = HashSet::new();
595
596 assert_ne!(
598 req_with_claims_a.to_sha256(&ignored_headers, false),
599 req_with_claims_b.to_sha256(&ignored_headers, false),
600 "requests with different JWT claims must hash differently by default"
601 );
602
603 assert_eq!(
605 req_with_claims_a.to_sha256(&ignored_headers, true),
606 req_with_claims_b.to_sha256(&ignored_headers, true),
607 "requests with different JWT claims must hash identically when ignore_auth_context is true"
608 );
609 }
610
611 #[test]
612 fn test_clone_does_not_copy_arbitrary_subgraph_request_extensions() {
613 #[derive(Clone, PartialEq, Debug)]
619 struct ShouldNotSurviveClone(u32);
620
621 let mut req = Request::fake_builder()
622 .subgraph_request(
623 http::Request::builder()
624 .body(graphql::Request::default())
625 .unwrap(),
626 )
627 .build();
628 req.subgraph_request
629 .extensions_mut()
630 .insert(ShouldNotSurviveClone(42));
631
632 let cloned = req.clone();
633 assert!(
634 cloned
635 .subgraph_request
636 .extensions()
637 .get::<ShouldNotSurviveClone>()
638 .is_none(),
639 "arbitrary extension types must not be copied when SubgraphRequest is cloned"
640 );
641 }
642}