1#![allow(missing_docs)] use std::collections::HashSet;
4use std::fmt::Display;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::Duration;
8
9use apollo_compiler::validation::Valid;
10use http::StatusCode;
11use http::Version;
12use http::header::CACHE_CONTROL;
13use multimap::MultiMap;
14use serde::Deserialize;
15use serde::Serialize;
16use serde_json_bytes::ByteString;
17use serde_json_bytes::Map as JsonMap;
18use serde_json_bytes::Value;
19use sha2::Digest;
20use sha2::Sha256;
21use static_assertions::assert_impl_all;
22use tokio::sync::broadcast;
23use tokio::sync::mpsc;
24use tokio_stream::Stream;
25use tower::BoxError;
26
27use crate::Context;
28use crate::batching::BatchQuery;
29use crate::error::Error;
30use crate::graphql;
31use crate::http_ext::TryIntoHeaderName;
32use crate::http_ext::TryIntoHeaderValue;
33use crate::http_ext::header_map;
34use crate::json_ext::Object;
35use crate::json_ext::Path;
36use crate::plugins::authentication::APOLLO_AUTHENTICATION_JWT_CLAIMS;
37use crate::plugins::authorization::CacheKeyMetadata;
38use crate::plugins::response_cache::cache_control::CacheControl;
39use crate::query_planner::fetch::OperationKind;
40use crate::spec::QueryHash;
41
42pub type BoxService = tower::util::BoxService<Request, Response, BoxError>;
43pub type BoxCloneService = tower::util::BoxCloneService<Request, Response, BoxError>;
44pub type ServiceResult = Result<Response, BoxError>;
45pub(crate) type BoxGqlStream = Pin<Box<dyn Stream<Item = graphql::Response> + Send + Sync>>;
46#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
48pub struct SubgraphRequestId(pub String);
49
50impl Display for SubgraphRequestId {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 write!(f, "{}", self.0)
53 }
54}
55
56assert_impl_all!(Request: Send);
57#[non_exhaustive]
58pub struct Request {
59 pub supergraph_request: Arc<http::Request<graphql::Request>>,
61
62 pub subgraph_request: http::Request<graphql::Request>,
63
64 pub operation_kind: OperationKind,
65
66 pub context: Context,
67
68 pub(crate) subgraph_name: String,
70 pub(crate) subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
72 pub(crate) connection_closed_signal: Option<broadcast::Receiver<()>>,
74
75 pub(crate) query_hash: Arc<QueryHash>,
76
77 pub(crate) authorization: Arc<CacheKeyMetadata>,
79
80 pub(crate) executable_document: Option<Arc<Valid<apollo_compiler::ExecutableDocument>>>,
81
82 pub(crate) id: SubgraphRequestId,
84}
85
86#[buildstructor::buildstructor]
87impl Request {
88 #[builder(visibility = "pub")]
92 fn new(
93 supergraph_request: Arc<http::Request<graphql::Request>>,
94 subgraph_request: http::Request<graphql::Request>,
95 operation_kind: OperationKind,
96 context: Context,
97 subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
98 subgraph_name: String,
99 connection_closed_signal: Option<broadcast::Receiver<()>>,
100 executable_document: Option<Arc<Valid<apollo_compiler::ExecutableDocument>>>,
101 ) -> Request {
102 Self {
103 supergraph_request,
104 subgraph_request,
105 operation_kind,
106 context,
107 subgraph_name,
108 subscription_stream,
109 connection_closed_signal,
110 query_hash: QueryHash::default().into(),
114 authorization: Default::default(),
115 executable_document,
116 id: SubgraphRequestId::new(),
117 }
118 }
119
120 #[builder(visibility = "pub")]
126 fn fake_new(
127 supergraph_request: Option<Arc<http::Request<graphql::Request>>>,
128 subgraph_request: Option<http::Request<graphql::Request>>,
129 operation_kind: Option<OperationKind>,
130 context: Option<Context>,
131 subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
132 subgraph_name: Option<String>,
133 connection_closed_signal: Option<broadcast::Receiver<()>>,
134 ) -> Request {
135 Request::new(
136 supergraph_request.unwrap_or_default(),
137 subgraph_request.unwrap_or_default(),
138 operation_kind.unwrap_or(OperationKind::Query),
139 context.unwrap_or_default(),
140 subscription_stream,
141 subgraph_name.unwrap_or_default(),
142 connection_closed_signal,
143 None,
144 )
145 }
146
147 pub(crate) fn is_part_of_batch(&self) -> bool {
148 self.context
149 .extensions()
150 .with_lock(|lock| lock.contains_key::<BatchQuery>())
151 }
152
153 pub(crate) fn subgraph_operation_name(&self) -> Option<&str> {
154 self.subgraph_request.body().operation_name.as_deref()
155 }
156
157 pub(crate) fn root_operation_fields(&self) -> Vec<String> {
158 self.executable_document
159 .as_ref()
160 .and_then(|executable_document| {
161 let operation_name = self.subgraph_operation_name();
162 Some(
163 executable_document
164 .operations
165 .get(operation_name)
166 .ok()?
167 .root_fields(executable_document)
168 .map(|f| f.name.to_string())
169 .collect(),
170 )
171 })
172 .unwrap_or_default()
173 }
174}
175
176impl Clone for Request {
177 fn clone(&self) -> Self {
178 let mut builder = http::Request::builder()
181 .method(self.subgraph_request.method())
182 .version(self.subgraph_request.version())
183 .uri(self.subgraph_request.uri());
184
185 {
186 let headers = builder.headers_mut().unwrap();
187 headers.extend(
188 self.subgraph_request
189 .headers()
190 .iter()
191 .map(|(name, value)| (name.clone(), value.clone())),
192 );
193 }
194 let subgraph_request = builder.body(self.subgraph_request.body().clone()).unwrap();
195
196 Self {
197 supergraph_request: self.supergraph_request.clone(),
198 subgraph_request,
199 operation_kind: self.operation_kind,
200 context: self.context.clone(),
201 subgraph_name: self.subgraph_name.clone(),
202 subscription_stream: self.subscription_stream.clone(),
203 connection_closed_signal: self
204 .connection_closed_signal
205 .as_ref()
206 .map(|s| s.resubscribe()),
207 query_hash: self.query_hash.clone(),
208 authorization: self.authorization.clone(),
209 executable_document: self.executable_document.clone(),
210 id: self.id.clone(),
211 }
212 }
213}
214
215impl SubgraphRequestId {
216 pub fn new() -> Self {
217 SubgraphRequestId(
218 uuid::Uuid::new_v4()
219 .as_hyphenated()
220 .encode_lower(&mut uuid::Uuid::encode_buffer())
221 .to_string(),
222 )
223 }
224}
225
226impl std::ops::Deref for SubgraphRequestId {
227 type Target = str;
228
229 fn deref(&self) -> &str {
230 &self.0
231 }
232}
233
234impl Default for SubgraphRequestId {
235 fn default() -> Self {
236 Self::new()
237 }
238}
239
240assert_impl_all!(Response: Send);
241#[derive(Debug)]
242#[non_exhaustive]
243pub struct Response {
244 pub response: http::Response<graphql::Response>,
245 pub(crate) subgraph_name: String,
247 pub context: Context,
248 pub(crate) id: SubgraphRequestId,
250}
251
252#[buildstructor::buildstructor]
253impl Response {
254 pub(crate) fn new_from_response(
259 response: http::Response<graphql::Response>,
260 context: Context,
261 subgraph_name: String,
262 id: SubgraphRequestId,
263 ) -> Self {
264 Self {
265 response,
266 context,
267 subgraph_name,
268 id,
269 }
270 }
271
272 #[builder(visibility = "pub")]
277 fn new(
278 label: Option<String>,
279 data: Option<Value>,
280 path: Option<Path>,
281 errors: Vec<Error>,
282 extensions: Object,
283 status_code: Option<StatusCode>,
284 context: Context,
285 headers: Option<http::HeaderMap<http::HeaderValue>>,
286 subgraph_name: String,
287 id: Option<SubgraphRequestId>,
288 ) -> Self {
289 let res = graphql::Response::builder()
291 .and_label(label)
292 .data(data.unwrap_or_default())
293 .and_path(path)
294 .errors(errors)
295 .extensions(extensions)
296 .build();
297
298 let mut response = http::Response::builder()
300 .status(status_code.unwrap_or(StatusCode::OK))
301 .body(res)
302 .expect("Response is serializable; qed");
303
304 *response.headers_mut() = headers.unwrap_or_default();
305
306 let id = id.unwrap_or_default();
310
311 Self {
312 response,
313 context,
314 subgraph_name,
315 id,
316 }
317 }
318
319 #[builder(visibility = "pub")]
325 fn fake_new(
326 label: Option<String>,
327 data: Option<Value>,
328 path: Option<Path>,
329 errors: Vec<Error>,
330 extensions: JsonMap<ByteString, Value>,
332 status_code: Option<StatusCode>,
333 context: Option<Context>,
334 headers: Option<http::HeaderMap<http::HeaderValue>>,
335 subgraph_name: Option<String>,
336 id: Option<SubgraphRequestId>,
337 ) -> Self {
338 Self::new(
339 label,
340 data,
341 path,
342 errors,
343 extensions,
344 status_code,
345 context.unwrap_or_default(),
346 headers,
347 subgraph_name.unwrap_or_default(),
348 id,
349 )
350 }
351
352 #[builder(visibility = "pub")]
359 fn fake2_new(
360 label: Option<String>,
361 data: Option<Value>,
362 path: Option<Path>,
363 errors: Vec<Error>,
364 extensions: JsonMap<ByteString, Value>,
366 status_code: Option<StatusCode>,
367 context: Option<Context>,
368 headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
369 subgraph_name: Option<String>,
370 id: Option<SubgraphRequestId>,
371 ) -> Result<Response, BoxError> {
372 Ok(Self::new(
373 label,
374 data,
375 path,
376 errors,
377 extensions,
378 status_code,
379 context.unwrap_or_default(),
380 Some(header_map(headers)?),
381 subgraph_name.unwrap_or_default(),
382 id,
383 ))
384 }
385
386 #[builder(visibility = "pub")]
390 fn error_new(
391 errors: Vec<Error>,
392 status_code: Option<StatusCode>,
393 context: Context,
394 subgraph_name: String,
395 id: Option<SubgraphRequestId>,
396 ) -> Self {
397 Self::new(
398 Default::default(),
399 Default::default(),
400 Default::default(),
401 errors,
402 Default::default(),
403 status_code,
404 context,
405 Default::default(),
406 subgraph_name,
407 id,
408 )
409 }
410
411 pub(crate) fn subgraph_cache_control(
412 &self,
413 default_ttl: Option<Duration>,
414 ) -> Result<CacheControl, BoxError> {
415 if self.response.headers().contains_key(&CACHE_CONTROL) {
416 CacheControl::new(self.response.headers(), default_ttl)
417 } else {
418 Ok(CacheControl::no_store())
419 }
420 }
421
422 pub(crate) fn get_from_extensions(&self, key: &str) -> Option<&Value> {
423 self.response.body().extensions.get(key)
424 }
425}
426
427impl Request {
428 pub(crate) fn to_sha256(&self, ignored_headers: &HashSet<String>) -> String {
429 let mut hasher = Sha256::new();
430 let http_req = &self.subgraph_request;
431 hasher.update(http_req.method().as_str().as_bytes());
432
433 let version = match http_req.version() {
435 Version::HTTP_09 => "HTTP/0.9",
436 Version::HTTP_10 => "HTTP/1.0",
437 Version::HTTP_11 => "HTTP/1.1",
438 Version::HTTP_2 => "HTTP/2.0",
439 Version::HTTP_3 => "HTTP/3.0",
440 _ => "unknown",
441 };
442 hasher.update(version.as_bytes());
443 let uri = http_req.uri();
444 if let Some(scheme) = uri.scheme() {
445 hasher.update(scheme.as_str().as_bytes());
446 }
447 if let Some(authority) = uri.authority() {
448 hasher.update(authority.as_str().as_bytes());
449 }
450 if let Some(query) = uri.query() {
451 hasher.update(query.as_bytes());
452 }
453
454 for (name, value) in http_req
456 .headers()
457 .iter()
458 .filter(|(name, _)| !ignored_headers.contains(name.as_str()))
459 {
460 hasher.update(name.as_str().as_bytes());
461 hasher.update(value.to_str().unwrap_or("ERROR").as_bytes());
462 }
463 if let Some(claim) = self
464 .context
465 .get_json_value(APOLLO_AUTHENTICATION_JWT_CLAIMS)
466 {
467 hasher.update(format!("{claim:?}").as_bytes());
468 }
469 let body = http_req.body();
470 if let Some(operation_name) = &body.operation_name {
471 hasher.update(operation_name.as_bytes());
472 }
473 if let Some(query) = &body.query {
474 hasher.update(query.as_bytes());
475 }
476 for (var_name, var_value) in &body.variables {
477 hasher.update(var_name.inner());
478 hasher.update(var_value.to_bytes());
479 }
480 for (name, val) in &body.extensions {
481 hasher.update(name.inner());
482 hasher.update(val.to_bytes());
483 }
484
485 hex::encode(hasher.finalize())
486 }
487}
488
489#[cfg(test)]
490mod tests {
491 use super::*;
492
493 #[test]
494 fn test_subgraph_request_hash() {
495 let subgraph_req_1 = Request::fake_builder()
496 .subgraph_request(
497 http::Request::builder()
498 .header("public_header", "value")
499 .header("auth", "my_token")
500 .body(graphql::Request::default())
501 .unwrap(),
502 )
503 .build();
504 let subgraph_req_2 = Request::fake_builder()
505 .subgraph_request(
506 http::Request::builder()
507 .header("public_header", "value_bis")
508 .header("auth", "my_token")
509 .body(graphql::Request::default())
510 .unwrap(),
511 )
512 .build();
513 let mut ignored_headers = HashSet::new();
514 ignored_headers.insert("public_header".to_string());
515 assert_eq!(
516 subgraph_req_1.to_sha256(&ignored_headers),
517 subgraph_req_2.to_sha256(&ignored_headers)
518 );
519
520 let subgraph_req_1 = Request::fake_builder()
521 .subgraph_request(
522 http::Request::builder()
523 .header("public_header", "value")
524 .header("auth", "my_token")
525 .body(graphql::Request::default())
526 .unwrap(),
527 )
528 .build();
529 let subgraph_req_2 = Request::fake_builder()
530 .subgraph_request(
531 http::Request::builder()
532 .header("public_header", "value_bis")
533 .header("auth", "my_token")
534 .body(graphql::Request::default())
535 .unwrap(),
536 )
537 .build();
538 let ignored_headers = HashSet::new();
539 assert_ne!(
540 subgraph_req_1.to_sha256(&ignored_headers),
541 subgraph_req_2.to_sha256(&ignored_headers)
542 );
543 }
544}