1#![allow(missing_docs)] use std::collections::HashSet;
4use std::fmt::Display;
5use std::pin::Pin;
6use std::sync::Arc;
7
8use apollo_compiler::validation::Valid;
9use http::StatusCode;
10use http::Version;
11use multimap::MultiMap;
12use serde::Deserialize;
13use serde::Serialize;
14use serde_json_bytes::ByteString;
15use serde_json_bytes::Map as JsonMap;
16use serde_json_bytes::Value;
17use sha2::Digest;
18use sha2::Sha256;
19use static_assertions::assert_impl_all;
20use tokio::sync::broadcast;
21use tokio::sync::mpsc;
22use tokio_stream::Stream;
23use tower::BoxError;
24
25use crate::Context;
26use crate::error::Error;
27use crate::graphql;
28use crate::http_ext::TryIntoHeaderName;
29use crate::http_ext::TryIntoHeaderValue;
30use crate::http_ext::header_map;
31use crate::json_ext::Object;
32use crate::json_ext::Path;
33use crate::plugins::authentication::APOLLO_AUTHENTICATION_JWT_CLAIMS;
34use crate::plugins::authorization::CacheKeyMetadata;
35use crate::query_planner::fetch::OperationKind;
36use crate::spec::QueryHash;
37
38pub type BoxService = tower::util::BoxService<Request, Response, BoxError>;
39pub type BoxCloneService = tower::util::BoxCloneService<Request, Response, BoxError>;
40pub type ServiceResult = Result<Response, BoxError>;
41pub(crate) type BoxGqlStream = Pin<Box<dyn Stream<Item = graphql::Response> + Send + Sync>>;
42#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
44pub struct SubgraphRequestId(pub String);
45
46impl Display for SubgraphRequestId {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 write!(f, "{}", self.0)
49 }
50}
51
52assert_impl_all!(Request: Send);
53#[non_exhaustive]
54pub struct Request {
55 pub supergraph_request: Arc<http::Request<graphql::Request>>,
57
58 pub subgraph_request: http::Request<graphql::Request>,
59
60 pub operation_kind: OperationKind,
61
62 pub context: Context,
63
64 pub(crate) subgraph_name: String,
66 pub(crate) subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
68 pub(crate) connection_closed_signal: Option<broadcast::Receiver<()>>,
70
71 pub(crate) query_hash: Arc<QueryHash>,
72
73 pub(crate) authorization: Arc<CacheKeyMetadata>,
75
76 pub(crate) executable_document: Option<Arc<Valid<apollo_compiler::ExecutableDocument>>>,
77
78 pub(crate) id: SubgraphRequestId,
80}
81
82#[buildstructor::buildstructor]
83impl Request {
84 #[builder(visibility = "pub")]
88 fn new(
89 supergraph_request: Arc<http::Request<graphql::Request>>,
90 subgraph_request: http::Request<graphql::Request>,
91 operation_kind: OperationKind,
92 context: Context,
93 subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
94 subgraph_name: String,
95 connection_closed_signal: Option<broadcast::Receiver<()>>,
96 executable_document: Option<Arc<Valid<apollo_compiler::ExecutableDocument>>>,
97 ) -> Request {
98 Self {
99 supergraph_request,
100 subgraph_request,
101 operation_kind,
102 context,
103 subgraph_name,
104 subscription_stream,
105 connection_closed_signal,
106 query_hash: QueryHash::default().into(),
110 authorization: Default::default(),
111 executable_document,
112 id: SubgraphRequestId::new(),
113 }
114 }
115
116 #[builder(visibility = "pub")]
122 fn fake_new(
123 supergraph_request: Option<Arc<http::Request<graphql::Request>>>,
124 subgraph_request: Option<http::Request<graphql::Request>>,
125 operation_kind: Option<OperationKind>,
126 context: Option<Context>,
127 subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
128 subgraph_name: Option<String>,
129 connection_closed_signal: Option<broadcast::Receiver<()>>,
130 ) -> Request {
131 Request::new(
132 supergraph_request.unwrap_or_default(),
133 subgraph_request.unwrap_or_default(),
134 operation_kind.unwrap_or(OperationKind::Query),
135 context.unwrap_or_default(),
136 subscription_stream,
137 subgraph_name.unwrap_or_default(),
138 connection_closed_signal,
139 None,
140 )
141 }
142}
143
144impl Clone for Request {
145 fn clone(&self) -> Self {
146 let mut builder = http::Request::builder()
149 .method(self.subgraph_request.method())
150 .version(self.subgraph_request.version())
151 .uri(self.subgraph_request.uri());
152
153 {
154 let headers = builder.headers_mut().unwrap();
155 headers.extend(
156 self.subgraph_request
157 .headers()
158 .iter()
159 .map(|(name, value)| (name.clone(), value.clone())),
160 );
161 }
162 let subgraph_request = builder.body(self.subgraph_request.body().clone()).unwrap();
163
164 Self {
165 supergraph_request: self.supergraph_request.clone(),
166 subgraph_request,
167 operation_kind: self.operation_kind,
168 context: self.context.clone(),
169 subgraph_name: self.subgraph_name.clone(),
170 subscription_stream: self.subscription_stream.clone(),
171 connection_closed_signal: self
172 .connection_closed_signal
173 .as_ref()
174 .map(|s| s.resubscribe()),
175 query_hash: self.query_hash.clone(),
176 authorization: self.authorization.clone(),
177 executable_document: self.executable_document.clone(),
178 id: self.id.clone(),
179 }
180 }
181}
182
183impl SubgraphRequestId {
184 pub fn new() -> Self {
185 SubgraphRequestId(
186 uuid::Uuid::new_v4()
187 .as_hyphenated()
188 .encode_lower(&mut uuid::Uuid::encode_buffer())
189 .to_string(),
190 )
191 }
192}
193
194impl std::ops::Deref for SubgraphRequestId {
195 type Target = str;
196
197 fn deref(&self) -> &str {
198 &self.0
199 }
200}
201
202impl Default for SubgraphRequestId {
203 fn default() -> Self {
204 Self::new()
205 }
206}
207
208assert_impl_all!(Response: Send);
209#[derive(Debug)]
210#[non_exhaustive]
211pub struct Response {
212 pub response: http::Response<graphql::Response>,
213 pub(crate) subgraph_name: String,
215 pub context: Context,
216 pub(crate) id: SubgraphRequestId,
218}
219
220#[buildstructor::buildstructor]
221impl Response {
222 pub(crate) fn new_from_response(
227 response: http::Response<graphql::Response>,
228 context: Context,
229 subgraph_name: String,
230 id: SubgraphRequestId,
231 ) -> Self {
232 Self {
233 response,
234 context,
235 subgraph_name,
236 id,
237 }
238 }
239
240 #[builder(visibility = "pub")]
245 fn new(
246 label: Option<String>,
247 data: Option<Value>,
248 path: Option<Path>,
249 errors: Vec<Error>,
250 extensions: Object,
251 status_code: Option<StatusCode>,
252 context: Context,
253 headers: Option<http::HeaderMap<http::HeaderValue>>,
254 subgraph_name: String,
255 id: Option<SubgraphRequestId>,
256 ) -> Self {
257 let res = graphql::Response::builder()
259 .and_label(label)
260 .data(data.unwrap_or_default())
261 .and_path(path)
262 .errors(errors)
263 .extensions(extensions)
264 .build();
265
266 let mut response = http::Response::builder()
268 .status(status_code.unwrap_or(StatusCode::OK))
269 .body(res)
270 .expect("Response is serializable; qed");
271
272 *response.headers_mut() = headers.unwrap_or_default();
273
274 let id = id.unwrap_or_default();
278
279 Self {
280 response,
281 context,
282 subgraph_name,
283 id,
284 }
285 }
286
287 #[builder(visibility = "pub")]
293 fn fake_new(
294 label: Option<String>,
295 data: Option<Value>,
296 path: Option<Path>,
297 errors: Vec<Error>,
298 extensions: JsonMap<ByteString, Value>,
300 status_code: Option<StatusCode>,
301 context: Option<Context>,
302 headers: Option<http::HeaderMap<http::HeaderValue>>,
303 subgraph_name: Option<String>,
304 id: Option<SubgraphRequestId>,
305 ) -> Self {
306 Self::new(
307 label,
308 data,
309 path,
310 errors,
311 extensions,
312 status_code,
313 context.unwrap_or_default(),
314 headers,
315 subgraph_name.unwrap_or_default(),
316 id,
317 )
318 }
319
320 #[builder(visibility = "pub")]
327 fn fake2_new(
328 label: Option<String>,
329 data: Option<Value>,
330 path: Option<Path>,
331 errors: Vec<Error>,
332 extensions: JsonMap<ByteString, Value>,
334 status_code: Option<StatusCode>,
335 context: Option<Context>,
336 headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
337 subgraph_name: Option<String>,
338 id: Option<SubgraphRequestId>,
339 ) -> Result<Response, BoxError> {
340 Ok(Self::new(
341 label,
342 data,
343 path,
344 errors,
345 extensions,
346 status_code,
347 context.unwrap_or_default(),
348 Some(header_map(headers)?),
349 subgraph_name.unwrap_or_default(),
350 id,
351 ))
352 }
353
354 #[builder(visibility = "pub")]
358 fn error_new(
359 errors: Vec<Error>,
360 status_code: Option<StatusCode>,
361 context: Context,
362 subgraph_name: String,
363 id: Option<SubgraphRequestId>,
364 ) -> Self {
365 Self::new(
366 Default::default(),
367 Default::default(),
368 Default::default(),
369 errors,
370 Default::default(),
371 status_code,
372 context,
373 Default::default(),
374 subgraph_name,
375 id,
376 )
377 }
378}
379
380impl Request {
381 pub(crate) fn to_sha256(&self, ignored_headers: &HashSet<String>) -> String {
382 let mut hasher = Sha256::new();
383 let http_req = &self.subgraph_request;
384 hasher.update(http_req.method().as_str().as_bytes());
385
386 let version = match http_req.version() {
388 Version::HTTP_09 => "HTTP/0.9",
389 Version::HTTP_10 => "HTTP/1.0",
390 Version::HTTP_11 => "HTTP/1.1",
391 Version::HTTP_2 => "HTTP/2.0",
392 Version::HTTP_3 => "HTTP/3.0",
393 _ => "unknown",
394 };
395 hasher.update(version.as_bytes());
396 let uri = http_req.uri();
397 if let Some(scheme) = uri.scheme() {
398 hasher.update(scheme.as_str().as_bytes());
399 }
400 if let Some(authority) = uri.authority() {
401 hasher.update(authority.as_str().as_bytes());
402 }
403 if let Some(query) = uri.query() {
404 hasher.update(query.as_bytes());
405 }
406
407 for (name, value) in http_req
409 .headers()
410 .iter()
411 .filter(|(name, _)| !ignored_headers.contains(name.as_str()))
412 {
413 hasher.update(name.as_str().as_bytes());
414 hasher.update(value.to_str().unwrap_or("ERROR").as_bytes());
415 }
416 if let Some(claim) = self
417 .context
418 .get_json_value(APOLLO_AUTHENTICATION_JWT_CLAIMS)
419 {
420 hasher.update(format!("{claim:?}").as_bytes());
421 }
422 let body = http_req.body();
423 if let Some(operation_name) = &body.operation_name {
424 hasher.update(operation_name.as_bytes());
425 }
426 if let Some(query) = &body.query {
427 hasher.update(query.as_bytes());
428 }
429 for (var_name, var_value) in &body.variables {
430 hasher.update(var_name.inner());
431 hasher.update(var_value.to_bytes());
432 }
433 for (name, val) in &body.extensions {
434 hasher.update(name.inner());
435 hasher.update(val.to_bytes());
436 }
437
438 hex::encode(hasher.finalize())
439 }
440}
441
442#[cfg(test)]
443mod tests {
444 use super::*;
445
446 #[test]
447 fn test_subgraph_request_hash() {
448 let subgraph_req_1 = Request::fake_builder()
449 .subgraph_request(
450 http::Request::builder()
451 .header("public_header", "value")
452 .header("auth", "my_token")
453 .body(graphql::Request::default())
454 .unwrap(),
455 )
456 .build();
457 let subgraph_req_2 = Request::fake_builder()
458 .subgraph_request(
459 http::Request::builder()
460 .header("public_header", "value_bis")
461 .header("auth", "my_token")
462 .body(graphql::Request::default())
463 .unwrap(),
464 )
465 .build();
466 let mut ignored_headers = HashSet::new();
467 ignored_headers.insert("public_header".to_string());
468 assert_eq!(
469 subgraph_req_1.to_sha256(&ignored_headers),
470 subgraph_req_2.to_sha256(&ignored_headers)
471 );
472
473 let subgraph_req_1 = Request::fake_builder()
474 .subgraph_request(
475 http::Request::builder()
476 .header("public_header", "value")
477 .header("auth", "my_token")
478 .body(graphql::Request::default())
479 .unwrap(),
480 )
481 .build();
482 let subgraph_req_2 = Request::fake_builder()
483 .subgraph_request(
484 http::Request::builder()
485 .header("public_header", "value_bis")
486 .header("auth", "my_token")
487 .body(graphql::Request::default())
488 .unwrap(),
489 )
490 .build();
491 let ignored_headers = HashSet::new();
492 assert_ne!(
493 subgraph_req_1.to_sha256(&ignored_headers),
494 subgraph_req_2.to_sha256(&ignored_headers)
495 );
496 }
497}