Skip to main content

apollo_router/services/
subgraph.rs

1#![allow(missing_docs)] // FIXME
2
3use std::collections::HashSet;
4use std::fmt::Display;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::Duration;
8
9use apollo_compiler::validation::Valid;
10use http::StatusCode;
11use http::Version;
12use http::header::CACHE_CONTROL;
13use multimap::MultiMap;
14use serde::Deserialize;
15use serde::Serialize;
16use serde_json_bytes::ByteString;
17use serde_json_bytes::Map as JsonMap;
18use serde_json_bytes::Value;
19use sha2::Digest;
20use sha2::Sha256;
21use static_assertions::assert_impl_all;
22use tokio::sync::broadcast;
23use tokio::sync::mpsc;
24use tokio_stream::Stream;
25use tower::BoxError;
26
27use crate::Context;
28use crate::batching::BatchQuery;
29use crate::error::Error;
30use crate::graphql;
31use crate::http_ext::TryIntoHeaderName;
32use crate::http_ext::TryIntoHeaderValue;
33use crate::http_ext::header_map;
34use crate::json_ext::Object;
35use crate::json_ext::Path;
36use crate::plugins::authentication::APOLLO_AUTHENTICATION_JWT_CLAIMS;
37use crate::plugins::authentication::subgraph::SigningParamsConfig;
38use crate::plugins::authorization::CacheKeyMetadata;
39use crate::plugins::response_cache::cache_control::CacheControl;
40use crate::query_planner::fetch::OperationKind;
41use crate::spec::QueryHash;
42
43pub type BoxService = tower::util::BoxService<Request, Response, BoxError>;
44pub type BoxCloneService = tower::util::BoxCloneService<Request, Response, BoxError>;
45pub type ServiceResult = Result<Response, BoxError>;
46pub(crate) type BoxGqlStream = Pin<Box<dyn Stream<Item = graphql::Response> + Send + Sync>>;
47/// unique id for a subgraph request and the related response
48#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
49pub struct SubgraphRequestId(pub String);
50
51impl Display for SubgraphRequestId {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        write!(f, "{}", self.0)
54    }
55}
56
57assert_impl_all!(Request: Send);
58#[non_exhaustive]
59pub struct Request {
60    /// Original request to the Router.
61    pub supergraph_request: Arc<http::Request<graphql::Request>>,
62
63    pub subgraph_request: http::Request<graphql::Request>,
64
65    pub operation_kind: OperationKind,
66
67    pub context: Context,
68
69    /// Name of the subgraph
70    pub(crate) subgraph_name: String,
71    /// Channel to send the subscription stream to listen on events coming from subgraph in a task
72    pub(crate) subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
73    /// Channel triggered when the client connection has been dropped
74    pub(crate) connection_closed_signal: Option<broadcast::Receiver<()>>,
75
76    pub(crate) query_hash: Arc<QueryHash>,
77
78    // authorization metadata for this request
79    pub(crate) authorization: Arc<CacheKeyMetadata>,
80
81    pub(crate) executable_document: Option<Arc<Valid<apollo_compiler::ExecutableDocument>>>,
82
83    /// unique id for this request
84    pub(crate) id: SubgraphRequestId,
85}
86
87#[buildstructor::buildstructor]
88impl Request {
89    /// This is the constructor (or builder) to use when constructing a real Request.
90    ///
91    /// Required parameters are required in non-testing code to create a Request.
92    #[builder(visibility = "pub")]
93    fn new(
94        supergraph_request: Arc<http::Request<graphql::Request>>,
95        subgraph_request: http::Request<graphql::Request>,
96        operation_kind: OperationKind,
97        context: Context,
98        subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
99        subgraph_name: String,
100        connection_closed_signal: Option<broadcast::Receiver<()>>,
101        executable_document: Option<Arc<Valid<apollo_compiler::ExecutableDocument>>>,
102    ) -> Request {
103        Self {
104            supergraph_request,
105            subgraph_request,
106            operation_kind,
107            context,
108            subgraph_name,
109            subscription_stream,
110            connection_closed_signal,
111            // It's NOT GREAT! to have an empty hash value here.
112            // This value is populated based on the subgraph query hash in the query planner code.
113            // At the time of writing it's in `crate::query_planner::fetch::FetchNode::fetch_node`.
114            query_hash: QueryHash::default().into(),
115            authorization: Default::default(),
116            executable_document,
117            id: SubgraphRequestId::new(),
118        }
119    }
120
121    /// This is the constructor (or builder) to use when constructing a "fake" Request.
122    ///
123    /// This does not enforce the provision of the data that is required for a fully functional
124    /// Request. It's usually enough for testing, when a fully consructed Request is
125    /// difficult to construct and not required for the pusposes of the test.
126    #[builder(visibility = "pub")]
127    fn fake_new(
128        supergraph_request: Option<Arc<http::Request<graphql::Request>>>,
129        subgraph_request: Option<http::Request<graphql::Request>>,
130        operation_kind: Option<OperationKind>,
131        context: Option<Context>,
132        subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
133        subgraph_name: Option<String>,
134        connection_closed_signal: Option<broadcast::Receiver<()>>,
135    ) -> Request {
136        Request::new(
137            supergraph_request.unwrap_or_default(),
138            subgraph_request.unwrap_or_default(),
139            operation_kind.unwrap_or(OperationKind::Query),
140            context.unwrap_or_default(),
141            subscription_stream,
142            subgraph_name.unwrap_or_default(),
143            connection_closed_signal,
144            None,
145        )
146    }
147
148    pub(crate) fn is_part_of_batch(&self) -> bool {
149        self.context
150            .extensions()
151            .with_lock(|lock| lock.contains_key::<BatchQuery>())
152    }
153
154    pub(crate) fn subgraph_operation_name(&self) -> Option<&str> {
155        self.subgraph_request.body().operation_name.as_deref()
156    }
157
158    pub(crate) fn root_operation_fields(&self) -> Vec<String> {
159        self.executable_document
160            .as_ref()
161            .and_then(|executable_document| {
162                let operation_name = self.subgraph_operation_name();
163                Some(
164                    executable_document
165                        .operations
166                        .get(operation_name)
167                        .ok()?
168                        .root_fields(executable_document)
169                        .map(|f| f.name.to_string())
170                        .collect(),
171                )
172            })
173            .unwrap_or_default()
174    }
175}
176
177impl Clone for Request {
178    fn clone(&self) -> Self {
179        // http::Request is not clonable so we have to rebuild a new one
180        let mut builder = http::Request::builder()
181            .method(self.subgraph_request.method())
182            .version(self.subgraph_request.version())
183            .uri(self.subgraph_request.uri());
184
185        {
186            let headers = builder.headers_mut().unwrap();
187            headers.extend(
188                self.subgraph_request
189                    .headers()
190                    .iter()
191                    .map(|(name, value)| (name.clone(), value.clone())),
192            );
193        }
194        let mut subgraph_request = builder.body(self.subgraph_request.body().clone()).unwrap();
195        // Copy only Arc<SigningParamsConfig> so APQ probe requests can be signed.
196        //
197        // We deliberately avoid copying all extensions: some types (e.g. MultipartFormData
198        // in the file-uploads plugin) hold shared stream state that must not be shared with
199        // the APQ probe clone — draining the probe would exhaust the original on retry.
200        //
201        // If a new extension type needs to survive SubgraphRequest clones, add it here.
202        if let Some(signing_params) = self
203            .subgraph_request
204            .extensions()
205            .get::<Arc<SigningParamsConfig>>()
206            .cloned()
207        {
208            subgraph_request.extensions_mut().insert(signing_params);
209        }
210
211        Self {
212            supergraph_request: self.supergraph_request.clone(),
213            subgraph_request,
214            operation_kind: self.operation_kind,
215            context: self.context.clone(),
216            subgraph_name: self.subgraph_name.clone(),
217            subscription_stream: self.subscription_stream.clone(),
218            connection_closed_signal: self
219                .connection_closed_signal
220                .as_ref()
221                .map(|s| s.resubscribe()),
222            query_hash: self.query_hash.clone(),
223            authorization: self.authorization.clone(),
224            executable_document: self.executable_document.clone(),
225            id: self.id.clone(),
226        }
227    }
228}
229
230impl SubgraphRequestId {
231    pub fn new() -> Self {
232        SubgraphRequestId(
233            uuid::Uuid::new_v4()
234                .as_hyphenated()
235                .encode_lower(&mut uuid::Uuid::encode_buffer())
236                .to_string(),
237        )
238    }
239}
240
241impl std::ops::Deref for SubgraphRequestId {
242    type Target = str;
243
244    fn deref(&self) -> &str {
245        &self.0
246    }
247}
248
249impl Default for SubgraphRequestId {
250    fn default() -> Self {
251        Self::new()
252    }
253}
254
255assert_impl_all!(Response: Send);
256#[derive(Debug)]
257#[non_exhaustive]
258pub struct Response {
259    pub response: http::Response<graphql::Response>,
260    /// Name of the subgraph
261    pub(crate) subgraph_name: String,
262    pub context: Context,
263    /// unique id matching the corresponding field in the request
264    pub(crate) id: SubgraphRequestId,
265}
266
267#[buildstructor::buildstructor]
268impl Response {
269    /// This is the constructor to use when constructing a real Response..
270    ///
271    /// In this case, you already have a valid response and just wish to associate it with a context
272    /// and create a Response.
273    pub(crate) fn new_from_response(
274        response: http::Response<graphql::Response>,
275        context: Context,
276        subgraph_name: String,
277        id: SubgraphRequestId,
278    ) -> Self {
279        Self {
280            response,
281            context,
282            subgraph_name,
283            id,
284        }
285    }
286
287    /// This is the constructor (or builder) to use when constructing a real Response.
288    ///
289    /// The parameters are not optional, because in a live situation all of these properties must be
290    /// set and be correct to create a Response.
291    #[builder(visibility = "pub")]
292    fn new(
293        label: Option<String>,
294        data: Option<Value>,
295        path: Option<Path>,
296        errors: Vec<Error>,
297        extensions: Object,
298        status_code: Option<StatusCode>,
299        context: Context,
300        headers: Option<http::HeaderMap<http::HeaderValue>>,
301        subgraph_name: String,
302        id: Option<SubgraphRequestId>,
303    ) -> Self {
304        // Build a response
305        let res = graphql::Response::builder()
306            .and_label(label)
307            .data(data.unwrap_or_default())
308            .and_path(path)
309            .errors(errors)
310            .extensions(extensions)
311            .build();
312
313        // Build an http Response
314        let mut response = http::Response::builder()
315            .status(status_code.unwrap_or(StatusCode::OK))
316            .body(res)
317            .expect("Response is serializable; qed");
318
319        *response.headers_mut() = headers.unwrap_or_default();
320
321        // Warning: the id argument for this builder is an Option to make that a non breaking change
322        // but this means that if a subgraph response is created explicitly without an id, it will
323        // be generated here and not match the id from the subgraph request
324        let id = id.unwrap_or_default();
325
326        Self {
327            response,
328            context,
329            subgraph_name,
330            id,
331        }
332    }
333
334    /// This is the constructor (or builder) to use when constructing a "fake" Response.
335    ///
336    /// This does not enforce the provision of the data that is required for a fully functional
337    /// Response. It's usually enough for testing, when a fully constructed Response is
338    /// difficult to construct and not required for the purposes of the test.
339    #[builder(visibility = "pub")]
340    fn fake_new(
341        label: Option<String>,
342        data: Option<Value>,
343        path: Option<Path>,
344        errors: Vec<Error>,
345        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
346        extensions: JsonMap<ByteString, Value>,
347        status_code: Option<StatusCode>,
348        context: Option<Context>,
349        headers: Option<http::HeaderMap<http::HeaderValue>>,
350        subgraph_name: Option<String>,
351        id: Option<SubgraphRequestId>,
352    ) -> Self {
353        Self::new(
354            label,
355            data,
356            path,
357            errors,
358            extensions,
359            status_code,
360            context.unwrap_or_default(),
361            headers,
362            subgraph_name.unwrap_or_default(),
363            id,
364        )
365    }
366
367    /// This is the constructor (or builder) to use when constructing a "fake" Response.
368    /// It differs from the existing fake_new because it allows easier passing of headers. However we can't change the original without breaking the public APIs.
369    ///
370    /// This does not enforce the provision of the data that is required for a fully functional
371    /// Response. It's usually enough for testing, when a fully constructed Response is
372    /// difficult to construct and not required for the purposes of the test.
373    #[builder(visibility = "pub")]
374    fn fake2_new(
375        label: Option<String>,
376        data: Option<Value>,
377        path: Option<Path>,
378        errors: Vec<Error>,
379        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
380        extensions: JsonMap<ByteString, Value>,
381        status_code: Option<StatusCode>,
382        context: Option<Context>,
383        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
384        subgraph_name: Option<String>,
385        id: Option<SubgraphRequestId>,
386    ) -> Result<Response, BoxError> {
387        Ok(Self::new(
388            label,
389            data,
390            path,
391            errors,
392            extensions,
393            status_code,
394            context.unwrap_or_default(),
395            Some(header_map(headers)?),
396            subgraph_name.unwrap_or_default(),
397            id,
398        ))
399    }
400
401    /// This is the constructor (or builder) to use when constructing a Response that represents a global error.
402    /// It has no path and no response data.
403    /// This is useful for things such as authentication errors.
404    #[builder(visibility = "pub")]
405    fn error_new(
406        errors: Vec<Error>,
407        status_code: Option<StatusCode>,
408        context: Context,
409        subgraph_name: String,
410        id: Option<SubgraphRequestId>,
411    ) -> Self {
412        Self::new(
413            Default::default(),
414            Default::default(),
415            Default::default(),
416            errors,
417            Default::default(),
418            status_code,
419            context,
420            Default::default(),
421            subgraph_name,
422            id,
423        )
424    }
425
426    pub(crate) fn subgraph_cache_control(
427        &self,
428        default_ttl: Option<Duration>,
429    ) -> Result<CacheControl, BoxError> {
430        if self.response.headers().contains_key(&CACHE_CONTROL) {
431            CacheControl::new(self.response.headers(), default_ttl)
432        } else {
433            Ok(CacheControl::no_store())
434        }
435    }
436
437    pub(crate) fn get_from_extensions(&self, key: &str) -> Option<&Value> {
438        self.response.body().extensions.get(key)
439    }
440}
441
442impl Request {
443    pub(crate) fn to_sha256(
444        &self,
445        ignored_headers: &HashSet<String>,
446        ignore_auth_context: bool,
447    ) -> String {
448        let mut hasher = Sha256::new();
449        let http_req = &self.subgraph_request;
450        hasher.update(http_req.method().as_str().as_bytes());
451
452        // To not allocate
453        let version = match http_req.version() {
454            Version::HTTP_09 => "HTTP/0.9",
455            Version::HTTP_10 => "HTTP/1.0",
456            Version::HTTP_11 => "HTTP/1.1",
457            Version::HTTP_2 => "HTTP/2.0",
458            Version::HTTP_3 => "HTTP/3.0",
459            _ => "unknown",
460        };
461        hasher.update(version.as_bytes());
462        let uri = http_req.uri();
463        if let Some(scheme) = uri.scheme() {
464            hasher.update(scheme.as_str().as_bytes());
465        }
466        if let Some(authority) = uri.authority() {
467            hasher.update(authority.as_str().as_bytes());
468        }
469        if let Some(query) = uri.query() {
470            hasher.update(query.as_bytes());
471        }
472
473        // this assumes headers are in the same order
474        for (name, value) in http_req
475            .headers()
476            .iter()
477            .filter(|(name, _)| !ignored_headers.contains(name.as_str()))
478        {
479            hasher.update(name.as_str().as_bytes());
480            hasher.update(value.to_str().unwrap_or("ERROR").as_bytes());
481        }
482        if !ignore_auth_context
483            && let Some(claim) = self
484                .context
485                .get_json_value(APOLLO_AUTHENTICATION_JWT_CLAIMS)
486        {
487            hasher.update(format!("{claim:?}").as_bytes());
488        }
489        let body = http_req.body();
490        if let Some(operation_name) = &body.operation_name {
491            hasher.update(operation_name.as_bytes());
492        }
493        if let Some(query) = &body.query {
494            hasher.update(query.as_bytes());
495        }
496        for (var_name, var_value) in &body.variables {
497            hasher.update(var_name.inner());
498            hasher.update(var_value.to_bytes());
499        }
500        for (name, val) in &body.extensions {
501            hasher.update(name.inner());
502            hasher.update(val.to_bytes());
503        }
504
505        hex::encode(hasher.finalize())
506    }
507}
508
509#[cfg(test)]
510mod tests {
511    use super::*;
512
513    #[test]
514    fn test_subgraph_request_hash() {
515        let subgraph_req_1 = Request::fake_builder()
516            .subgraph_request(
517                http::Request::builder()
518                    .header("public_header", "value")
519                    .header("auth", "my_token")
520                    .body(graphql::Request::default())
521                    .unwrap(),
522            )
523            .build();
524        let subgraph_req_2 = Request::fake_builder()
525            .subgraph_request(
526                http::Request::builder()
527                    .header("public_header", "value_bis")
528                    .header("auth", "my_token")
529                    .body(graphql::Request::default())
530                    .unwrap(),
531            )
532            .build();
533        let mut ignored_headers = HashSet::new();
534        ignored_headers.insert("public_header".to_string());
535        assert_eq!(
536            subgraph_req_1.to_sha256(&ignored_headers, false),
537            subgraph_req_2.to_sha256(&ignored_headers, false)
538        );
539
540        let subgraph_req_1 = Request::fake_builder()
541            .subgraph_request(
542                http::Request::builder()
543                    .header("public_header", "value")
544                    .header("auth", "my_token")
545                    .body(graphql::Request::default())
546                    .unwrap(),
547            )
548            .build();
549        let subgraph_req_2 = Request::fake_builder()
550            .subgraph_request(
551                http::Request::builder()
552                    .header("public_header", "value_bis")
553                    .header("auth", "my_token")
554                    .body(graphql::Request::default())
555                    .unwrap(),
556            )
557            .build();
558        let ignored_headers = HashSet::new();
559        assert_ne!(
560            subgraph_req_1.to_sha256(&ignored_headers, false),
561            subgraph_req_2.to_sha256(&ignored_headers, false)
562        );
563    }
564
565    #[test]
566    fn test_subgraph_request_hash_ignore_auth_context() {
567        use serde_json_bytes::json;
568
569        // Build two requests with different JWT claims in context.
570        let req_with_claims_a = Request::fake_builder()
571            .subgraph_request(
572                http::Request::builder()
573                    .body(graphql::Request::default())
574                    .unwrap(),
575            )
576            .build();
577        req_with_claims_a
578            .context
579            .insert(APOLLO_AUTHENTICATION_JWT_CLAIMS, json!({"sub": "user-a"}))
580            .expect("insert JWT claims");
581
582        let req_with_claims_b = Request::fake_builder()
583            .subgraph_request(
584                http::Request::builder()
585                    .body(graphql::Request::default())
586                    .unwrap(),
587            )
588            .build();
589        req_with_claims_b
590            .context
591            .insert(APOLLO_AUTHENTICATION_JWT_CLAIMS, json!({"sub": "user-b"}))
592            .expect("insert JWT claims");
593
594        let ignored_headers = HashSet::new();
595
596        // Different claims → different hashes when auth context is included.
597        assert_ne!(
598            req_with_claims_a.to_sha256(&ignored_headers, false),
599            req_with_claims_b.to_sha256(&ignored_headers, false),
600            "requests with different JWT claims must hash differently by default"
601        );
602
603        // Same hash when auth context is ignored.
604        assert_eq!(
605            req_with_claims_a.to_sha256(&ignored_headers, true),
606            req_with_claims_b.to_sha256(&ignored_headers, true),
607            "requests with different JWT claims must hash identically when ignore_auth_context is true"
608        );
609    }
610
611    #[test]
612    fn test_clone_does_not_copy_arbitrary_subgraph_request_extensions() {
613        // The Clone impl copies only specific extension types needed for APQ retries
614        // (Arc<SigningParamsConfig> for SigV4 — see authentication/subgraph.rs for the
615        // positive test). Arbitrary types must NOT be copied: some extensions
616        // (e.g. MultipartFormData in file uploads) hold shared stream state, and copying
617        // them would cause the APQ probe clone to exhaust the stream before the retry.
618        #[derive(Clone, PartialEq, Debug)]
619        struct ShouldNotSurviveClone(u32);
620
621        let mut req = Request::fake_builder()
622            .subgraph_request(
623                http::Request::builder()
624                    .body(graphql::Request::default())
625                    .unwrap(),
626            )
627            .build();
628        req.subgraph_request
629            .extensions_mut()
630            .insert(ShouldNotSurviveClone(42));
631
632        let cloned = req.clone();
633        assert!(
634            cloned
635                .subgraph_request
636                .extensions()
637                .get::<ShouldNotSurviveClone>()
638                .is_none(),
639            "arbitrary extension types must not be copied when SubgraphRequest is cloned"
640        );
641    }
642}