Skip to main content

apollo_router/services/
subgraph.rs

1#![allow(missing_docs)] // FIXME
2
3use std::collections::HashSet;
4use std::fmt::Display;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::Duration;
8
9use apollo_compiler::validation::Valid;
10use http::StatusCode;
11use http::Version;
12use http::header::CACHE_CONTROL;
13use multimap::MultiMap;
14use serde::Deserialize;
15use serde::Serialize;
16use serde_json_bytes::ByteString;
17use serde_json_bytes::Map as JsonMap;
18use serde_json_bytes::Value;
19use sha2::Digest;
20use sha2::Sha256;
21use static_assertions::assert_impl_all;
22use tokio::sync::broadcast;
23use tokio::sync::mpsc;
24use tokio_stream::Stream;
25use tower::BoxError;
26
27use crate::Context;
28use crate::batching::BatchQuery;
29use crate::error::Error;
30use crate::graphql;
31use crate::http_ext::TryIntoHeaderName;
32use crate::http_ext::TryIntoHeaderValue;
33use crate::http_ext::header_map;
34use crate::json_ext::Object;
35use crate::json_ext::Path;
36use crate::plugins::authentication::APOLLO_AUTHENTICATION_JWT_CLAIMS;
37use crate::plugins::authorization::CacheKeyMetadata;
38use crate::plugins::response_cache::cache_control::CacheControl;
39use crate::query_planner::fetch::OperationKind;
40use crate::spec::QueryHash;
41
42pub type BoxService = tower::util::BoxService<Request, Response, BoxError>;
43pub type BoxCloneService = tower::util::BoxCloneService<Request, Response, BoxError>;
44pub type ServiceResult = Result<Response, BoxError>;
45pub(crate) type BoxGqlStream = Pin<Box<dyn Stream<Item = graphql::Response> + Send + Sync>>;
46/// unique id for a subgraph request and the related response
47#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
48pub struct SubgraphRequestId(pub String);
49
50impl Display for SubgraphRequestId {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        write!(f, "{}", self.0)
53    }
54}
55
56assert_impl_all!(Request: Send);
57#[non_exhaustive]
58pub struct Request {
59    /// Original request to the Router.
60    pub supergraph_request: Arc<http::Request<graphql::Request>>,
61
62    pub subgraph_request: http::Request<graphql::Request>,
63
64    pub operation_kind: OperationKind,
65
66    pub context: Context,
67
68    /// Name of the subgraph
69    pub(crate) subgraph_name: String,
70    /// Channel to send the subscription stream to listen on events coming from subgraph in a task
71    pub(crate) subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
72    /// Channel triggered when the client connection has been dropped
73    pub(crate) connection_closed_signal: Option<broadcast::Receiver<()>>,
74
75    pub(crate) query_hash: Arc<QueryHash>,
76
77    // authorization metadata for this request
78    pub(crate) authorization: Arc<CacheKeyMetadata>,
79
80    pub(crate) executable_document: Option<Arc<Valid<apollo_compiler::ExecutableDocument>>>,
81
82    /// unique id for this request
83    pub(crate) id: SubgraphRequestId,
84}
85
86#[buildstructor::buildstructor]
87impl Request {
88    /// This is the constructor (or builder) to use when constructing a real Request.
89    ///
90    /// Required parameters are required in non-testing code to create a Request.
91    #[builder(visibility = "pub")]
92    fn new(
93        supergraph_request: Arc<http::Request<graphql::Request>>,
94        subgraph_request: http::Request<graphql::Request>,
95        operation_kind: OperationKind,
96        context: Context,
97        subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
98        subgraph_name: String,
99        connection_closed_signal: Option<broadcast::Receiver<()>>,
100        executable_document: Option<Arc<Valid<apollo_compiler::ExecutableDocument>>>,
101    ) -> Request {
102        Self {
103            supergraph_request,
104            subgraph_request,
105            operation_kind,
106            context,
107            subgraph_name,
108            subscription_stream,
109            connection_closed_signal,
110            // It's NOT GREAT! to have an empty hash value here.
111            // This value is populated based on the subgraph query hash in the query planner code.
112            // At the time of writing it's in `crate::query_planner::fetch::FetchNode::fetch_node`.
113            query_hash: QueryHash::default().into(),
114            authorization: Default::default(),
115            executable_document,
116            id: SubgraphRequestId::new(),
117        }
118    }
119
120    /// This is the constructor (or builder) to use when constructing a "fake" Request.
121    ///
122    /// This does not enforce the provision of the data that is required for a fully functional
123    /// Request. It's usually enough for testing, when a fully consructed Request is
124    /// difficult to construct and not required for the pusposes of the test.
125    #[builder(visibility = "pub")]
126    fn fake_new(
127        supergraph_request: Option<Arc<http::Request<graphql::Request>>>,
128        subgraph_request: Option<http::Request<graphql::Request>>,
129        operation_kind: Option<OperationKind>,
130        context: Option<Context>,
131        subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
132        subgraph_name: Option<String>,
133        connection_closed_signal: Option<broadcast::Receiver<()>>,
134    ) -> Request {
135        Request::new(
136            supergraph_request.unwrap_or_default(),
137            subgraph_request.unwrap_or_default(),
138            operation_kind.unwrap_or(OperationKind::Query),
139            context.unwrap_or_default(),
140            subscription_stream,
141            subgraph_name.unwrap_or_default(),
142            connection_closed_signal,
143            None,
144        )
145    }
146
147    pub(crate) fn is_part_of_batch(&self) -> bool {
148        self.context
149            .extensions()
150            .with_lock(|lock| lock.contains_key::<BatchQuery>())
151    }
152
153    pub(crate) fn subgraph_operation_name(&self) -> Option<&str> {
154        self.subgraph_request.body().operation_name.as_deref()
155    }
156
157    pub(crate) fn root_operation_fields(&self) -> Vec<String> {
158        self.executable_document
159            .as_ref()
160            .and_then(|executable_document| {
161                let operation_name = self.subgraph_operation_name();
162                Some(
163                    executable_document
164                        .operations
165                        .get(operation_name)
166                        .ok()?
167                        .root_fields(executable_document)
168                        .map(|f| f.name.to_string())
169                        .collect(),
170                )
171            })
172            .unwrap_or_default()
173    }
174}
175
176impl Clone for Request {
177    fn clone(&self) -> Self {
178        // http::Request is not clonable so we have to rebuild a new one
179        // we don't use the extensions field for now
180        let mut builder = http::Request::builder()
181            .method(self.subgraph_request.method())
182            .version(self.subgraph_request.version())
183            .uri(self.subgraph_request.uri());
184
185        {
186            let headers = builder.headers_mut().unwrap();
187            headers.extend(
188                self.subgraph_request
189                    .headers()
190                    .iter()
191                    .map(|(name, value)| (name.clone(), value.clone())),
192            );
193        }
194        let subgraph_request = builder.body(self.subgraph_request.body().clone()).unwrap();
195
196        Self {
197            supergraph_request: self.supergraph_request.clone(),
198            subgraph_request,
199            operation_kind: self.operation_kind,
200            context: self.context.clone(),
201            subgraph_name: self.subgraph_name.clone(),
202            subscription_stream: self.subscription_stream.clone(),
203            connection_closed_signal: self
204                .connection_closed_signal
205                .as_ref()
206                .map(|s| s.resubscribe()),
207            query_hash: self.query_hash.clone(),
208            authorization: self.authorization.clone(),
209            executable_document: self.executable_document.clone(),
210            id: self.id.clone(),
211        }
212    }
213}
214
215impl SubgraphRequestId {
216    pub fn new() -> Self {
217        SubgraphRequestId(
218            uuid::Uuid::new_v4()
219                .as_hyphenated()
220                .encode_lower(&mut uuid::Uuid::encode_buffer())
221                .to_string(),
222        )
223    }
224}
225
226impl std::ops::Deref for SubgraphRequestId {
227    type Target = str;
228
229    fn deref(&self) -> &str {
230        &self.0
231    }
232}
233
234impl Default for SubgraphRequestId {
235    fn default() -> Self {
236        Self::new()
237    }
238}
239
240assert_impl_all!(Response: Send);
241#[derive(Debug)]
242#[non_exhaustive]
243pub struct Response {
244    pub response: http::Response<graphql::Response>,
245    /// Name of the subgraph
246    pub(crate) subgraph_name: String,
247    pub context: Context,
248    /// unique id matching the corresponding field in the request
249    pub(crate) id: SubgraphRequestId,
250}
251
252#[buildstructor::buildstructor]
253impl Response {
254    /// This is the constructor to use when constructing a real Response..
255    ///
256    /// In this case, you already have a valid response and just wish to associate it with a context
257    /// and create a Response.
258    pub(crate) fn new_from_response(
259        response: http::Response<graphql::Response>,
260        context: Context,
261        subgraph_name: String,
262        id: SubgraphRequestId,
263    ) -> Self {
264        Self {
265            response,
266            context,
267            subgraph_name,
268            id,
269        }
270    }
271
272    /// This is the constructor (or builder) to use when constructing a real Response.
273    ///
274    /// The parameters are not optional, because in a live situation all of these properties must be
275    /// set and be correct to create a Response.
276    #[builder(visibility = "pub")]
277    fn new(
278        label: Option<String>,
279        data: Option<Value>,
280        path: Option<Path>,
281        errors: Vec<Error>,
282        extensions: Object,
283        status_code: Option<StatusCode>,
284        context: Context,
285        headers: Option<http::HeaderMap<http::HeaderValue>>,
286        subgraph_name: String,
287        id: Option<SubgraphRequestId>,
288    ) -> Self {
289        // Build a response
290        let res = graphql::Response::builder()
291            .and_label(label)
292            .data(data.unwrap_or_default())
293            .and_path(path)
294            .errors(errors)
295            .extensions(extensions)
296            .build();
297
298        // Build an http Response
299        let mut response = http::Response::builder()
300            .status(status_code.unwrap_or(StatusCode::OK))
301            .body(res)
302            .expect("Response is serializable; qed");
303
304        *response.headers_mut() = headers.unwrap_or_default();
305
306        // Warning: the id argument for this builder is an Option to make that a non breaking change
307        // but this means that if a subgraph response is created explicitly without an id, it will
308        // be generated here and not match the id from the subgraph request
309        let id = id.unwrap_or_default();
310
311        Self {
312            response,
313            context,
314            subgraph_name,
315            id,
316        }
317    }
318
319    /// This is the constructor (or builder) to use when constructing a "fake" Response.
320    ///
321    /// This does not enforce the provision of the data that is required for a fully functional
322    /// Response. It's usually enough for testing, when a fully constructed Response is
323    /// difficult to construct and not required for the purposes of the test.
324    #[builder(visibility = "pub")]
325    fn fake_new(
326        label: Option<String>,
327        data: Option<Value>,
328        path: Option<Path>,
329        errors: Vec<Error>,
330        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
331        extensions: JsonMap<ByteString, Value>,
332        status_code: Option<StatusCode>,
333        context: Option<Context>,
334        headers: Option<http::HeaderMap<http::HeaderValue>>,
335        subgraph_name: Option<String>,
336        id: Option<SubgraphRequestId>,
337    ) -> Self {
338        Self::new(
339            label,
340            data,
341            path,
342            errors,
343            extensions,
344            status_code,
345            context.unwrap_or_default(),
346            headers,
347            subgraph_name.unwrap_or_default(),
348            id,
349        )
350    }
351
352    /// This is the constructor (or builder) to use when constructing a "fake" Response.
353    /// It differs from the existing fake_new because it allows easier passing of headers. However we can't change the original without breaking the public APIs.
354    ///
355    /// This does not enforce the provision of the data that is required for a fully functional
356    /// Response. It's usually enough for testing, when a fully constructed Response is
357    /// difficult to construct and not required for the purposes of the test.
358    #[builder(visibility = "pub")]
359    fn fake2_new(
360        label: Option<String>,
361        data: Option<Value>,
362        path: Option<Path>,
363        errors: Vec<Error>,
364        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
365        extensions: JsonMap<ByteString, Value>,
366        status_code: Option<StatusCode>,
367        context: Option<Context>,
368        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
369        subgraph_name: Option<String>,
370        id: Option<SubgraphRequestId>,
371    ) -> Result<Response, BoxError> {
372        Ok(Self::new(
373            label,
374            data,
375            path,
376            errors,
377            extensions,
378            status_code,
379            context.unwrap_or_default(),
380            Some(header_map(headers)?),
381            subgraph_name.unwrap_or_default(),
382            id,
383        ))
384    }
385
386    /// This is the constructor (or builder) to use when constructing a Response that represents a global error.
387    /// It has no path and no response data.
388    /// This is useful for things such as authentication errors.
389    #[builder(visibility = "pub")]
390    fn error_new(
391        errors: Vec<Error>,
392        status_code: Option<StatusCode>,
393        context: Context,
394        subgraph_name: String,
395        id: Option<SubgraphRequestId>,
396    ) -> Self {
397        Self::new(
398            Default::default(),
399            Default::default(),
400            Default::default(),
401            errors,
402            Default::default(),
403            status_code,
404            context,
405            Default::default(),
406            subgraph_name,
407            id,
408        )
409    }
410
411    pub(crate) fn subgraph_cache_control(
412        &self,
413        default_ttl: Option<Duration>,
414    ) -> Result<CacheControl, BoxError> {
415        if self.response.headers().contains_key(&CACHE_CONTROL) {
416            CacheControl::new(self.response.headers(), default_ttl)
417        } else {
418            Ok(CacheControl::no_store())
419        }
420    }
421
422    pub(crate) fn get_from_extensions(&self, key: &str) -> Option<&Value> {
423        self.response.body().extensions.get(key)
424    }
425}
426
427impl Request {
428    pub(crate) fn to_sha256(&self, ignored_headers: &HashSet<String>) -> String {
429        let mut hasher = Sha256::new();
430        let http_req = &self.subgraph_request;
431        hasher.update(http_req.method().as_str().as_bytes());
432
433        // To not allocate
434        let version = match http_req.version() {
435            Version::HTTP_09 => "HTTP/0.9",
436            Version::HTTP_10 => "HTTP/1.0",
437            Version::HTTP_11 => "HTTP/1.1",
438            Version::HTTP_2 => "HTTP/2.0",
439            Version::HTTP_3 => "HTTP/3.0",
440            _ => "unknown",
441        };
442        hasher.update(version.as_bytes());
443        let uri = http_req.uri();
444        if let Some(scheme) = uri.scheme() {
445            hasher.update(scheme.as_str().as_bytes());
446        }
447        if let Some(authority) = uri.authority() {
448            hasher.update(authority.as_str().as_bytes());
449        }
450        if let Some(query) = uri.query() {
451            hasher.update(query.as_bytes());
452        }
453
454        // this assumes headers are in the same order
455        for (name, value) in http_req
456            .headers()
457            .iter()
458            .filter(|(name, _)| !ignored_headers.contains(name.as_str()))
459        {
460            hasher.update(name.as_str().as_bytes());
461            hasher.update(value.to_str().unwrap_or("ERROR").as_bytes());
462        }
463        if let Some(claim) = self
464            .context
465            .get_json_value(APOLLO_AUTHENTICATION_JWT_CLAIMS)
466        {
467            hasher.update(format!("{claim:?}").as_bytes());
468        }
469        let body = http_req.body();
470        if let Some(operation_name) = &body.operation_name {
471            hasher.update(operation_name.as_bytes());
472        }
473        if let Some(query) = &body.query {
474            hasher.update(query.as_bytes());
475        }
476        for (var_name, var_value) in &body.variables {
477            hasher.update(var_name.inner());
478            hasher.update(var_value.to_bytes());
479        }
480        for (name, val) in &body.extensions {
481            hasher.update(name.inner());
482            hasher.update(val.to_bytes());
483        }
484
485        hex::encode(hasher.finalize())
486    }
487}
488
489#[cfg(test)]
490mod tests {
491    use super::*;
492
493    #[test]
494    fn test_subgraph_request_hash() {
495        let subgraph_req_1 = Request::fake_builder()
496            .subgraph_request(
497                http::Request::builder()
498                    .header("public_header", "value")
499                    .header("auth", "my_token")
500                    .body(graphql::Request::default())
501                    .unwrap(),
502            )
503            .build();
504        let subgraph_req_2 = Request::fake_builder()
505            .subgraph_request(
506                http::Request::builder()
507                    .header("public_header", "value_bis")
508                    .header("auth", "my_token")
509                    .body(graphql::Request::default())
510                    .unwrap(),
511            )
512            .build();
513        let mut ignored_headers = HashSet::new();
514        ignored_headers.insert("public_header".to_string());
515        assert_eq!(
516            subgraph_req_1.to_sha256(&ignored_headers),
517            subgraph_req_2.to_sha256(&ignored_headers)
518        );
519
520        let subgraph_req_1 = Request::fake_builder()
521            .subgraph_request(
522                http::Request::builder()
523                    .header("public_header", "value")
524                    .header("auth", "my_token")
525                    .body(graphql::Request::default())
526                    .unwrap(),
527            )
528            .build();
529        let subgraph_req_2 = Request::fake_builder()
530            .subgraph_request(
531                http::Request::builder()
532                    .header("public_header", "value_bis")
533                    .header("auth", "my_token")
534                    .body(graphql::Request::default())
535                    .unwrap(),
536            )
537            .build();
538        let ignored_headers = HashSet::new();
539        assert_ne!(
540            subgraph_req_1.to_sha256(&ignored_headers),
541            subgraph_req_2.to_sha256(&ignored_headers)
542        );
543    }
544}