apollo_router/services/
subgraph.rs

1#![allow(missing_docs)] // FIXME
2
3use std::collections::HashSet;
4use std::fmt::Display;
5use std::pin::Pin;
6use std::sync::Arc;
7
8use apollo_compiler::validation::Valid;
9use http::StatusCode;
10use http::Version;
11use multimap::MultiMap;
12use serde::Deserialize;
13use serde::Serialize;
14use serde_json_bytes::ByteString;
15use serde_json_bytes::Map as JsonMap;
16use serde_json_bytes::Value;
17use sha2::Digest;
18use sha2::Sha256;
19use static_assertions::assert_impl_all;
20use tokio::sync::broadcast;
21use tokio::sync::mpsc;
22use tokio_stream::Stream;
23use tower::BoxError;
24
25use crate::Context;
26use crate::error::Error;
27use crate::graphql;
28use crate::http_ext::TryIntoHeaderName;
29use crate::http_ext::TryIntoHeaderValue;
30use crate::http_ext::header_map;
31use crate::json_ext::Object;
32use crate::json_ext::Path;
33use crate::plugins::authentication::APOLLO_AUTHENTICATION_JWT_CLAIMS;
34use crate::plugins::authorization::CacheKeyMetadata;
35use crate::query_planner::fetch::OperationKind;
36use crate::spec::QueryHash;
37
38pub type BoxService = tower::util::BoxService<Request, Response, BoxError>;
39pub type BoxCloneService = tower::util::BoxCloneService<Request, Response, BoxError>;
40pub type ServiceResult = Result<Response, BoxError>;
41pub(crate) type BoxGqlStream = Pin<Box<dyn Stream<Item = graphql::Response> + Send + Sync>>;
42/// unique id for a subgraph request and the related response
43#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
44pub struct SubgraphRequestId(pub String);
45
46impl Display for SubgraphRequestId {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        write!(f, "{}", self.0)
49    }
50}
51
52assert_impl_all!(Request: Send);
53#[non_exhaustive]
54pub struct Request {
55    /// Original request to the Router.
56    pub supergraph_request: Arc<http::Request<graphql::Request>>,
57
58    pub subgraph_request: http::Request<graphql::Request>,
59
60    pub operation_kind: OperationKind,
61
62    pub context: Context,
63
64    /// Name of the subgraph
65    pub(crate) subgraph_name: String,
66    /// Channel to send the subscription stream to listen on events coming from subgraph in a task
67    pub(crate) subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
68    /// Channel triggered when the client connection has been dropped
69    pub(crate) connection_closed_signal: Option<broadcast::Receiver<()>>,
70
71    pub(crate) query_hash: Arc<QueryHash>,
72
73    // authorization metadata for this request
74    pub(crate) authorization: Arc<CacheKeyMetadata>,
75
76    pub(crate) executable_document: Option<Arc<Valid<apollo_compiler::ExecutableDocument>>>,
77
78    /// unique id for this request
79    pub(crate) id: SubgraphRequestId,
80}
81
82#[buildstructor::buildstructor]
83impl Request {
84    /// This is the constructor (or builder) to use when constructing a real Request.
85    ///
86    /// Required parameters are required in non-testing code to create a Request.
87    #[builder(visibility = "pub")]
88    fn new(
89        supergraph_request: Arc<http::Request<graphql::Request>>,
90        subgraph_request: http::Request<graphql::Request>,
91        operation_kind: OperationKind,
92        context: Context,
93        subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
94        subgraph_name: String,
95        connection_closed_signal: Option<broadcast::Receiver<()>>,
96        executable_document: Option<Arc<Valid<apollo_compiler::ExecutableDocument>>>,
97    ) -> Request {
98        Self {
99            supergraph_request,
100            subgraph_request,
101            operation_kind,
102            context,
103            subgraph_name,
104            subscription_stream,
105            connection_closed_signal,
106            // It's NOT GREAT! to have an empty hash value here.
107            // This value is populated based on the subgraph query hash in the query planner code.
108            // At the time of writing it's in `crate::query_planner::fetch::FetchNode::fetch_node`.
109            query_hash: QueryHash::default().into(),
110            authorization: Default::default(),
111            executable_document,
112            id: SubgraphRequestId::new(),
113        }
114    }
115
116    /// This is the constructor (or builder) to use when constructing a "fake" Request.
117    ///
118    /// This does not enforce the provision of the data that is required for a fully functional
119    /// Request. It's usually enough for testing, when a fully consructed Request is
120    /// difficult to construct and not required for the pusposes of the test.
121    #[builder(visibility = "pub")]
122    fn fake_new(
123        supergraph_request: Option<Arc<http::Request<graphql::Request>>>,
124        subgraph_request: Option<http::Request<graphql::Request>>,
125        operation_kind: Option<OperationKind>,
126        context: Option<Context>,
127        subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
128        subgraph_name: Option<String>,
129        connection_closed_signal: Option<broadcast::Receiver<()>>,
130    ) -> Request {
131        Request::new(
132            supergraph_request.unwrap_or_default(),
133            subgraph_request.unwrap_or_default(),
134            operation_kind.unwrap_or(OperationKind::Query),
135            context.unwrap_or_default(),
136            subscription_stream,
137            subgraph_name.unwrap_or_default(),
138            connection_closed_signal,
139            None,
140        )
141    }
142}
143
144impl Clone for Request {
145    fn clone(&self) -> Self {
146        // http::Request is not clonable so we have to rebuild a new one
147        // we don't use the extensions field for now
148        let mut builder = http::Request::builder()
149            .method(self.subgraph_request.method())
150            .version(self.subgraph_request.version())
151            .uri(self.subgraph_request.uri());
152
153        {
154            let headers = builder.headers_mut().unwrap();
155            headers.extend(
156                self.subgraph_request
157                    .headers()
158                    .iter()
159                    .map(|(name, value)| (name.clone(), value.clone())),
160            );
161        }
162        let subgraph_request = builder.body(self.subgraph_request.body().clone()).unwrap();
163
164        Self {
165            supergraph_request: self.supergraph_request.clone(),
166            subgraph_request,
167            operation_kind: self.operation_kind,
168            context: self.context.clone(),
169            subgraph_name: self.subgraph_name.clone(),
170            subscription_stream: self.subscription_stream.clone(),
171            connection_closed_signal: self
172                .connection_closed_signal
173                .as_ref()
174                .map(|s| s.resubscribe()),
175            query_hash: self.query_hash.clone(),
176            authorization: self.authorization.clone(),
177            executable_document: self.executable_document.clone(),
178            id: self.id.clone(),
179        }
180    }
181}
182
183impl SubgraphRequestId {
184    pub fn new() -> Self {
185        SubgraphRequestId(
186            uuid::Uuid::new_v4()
187                .as_hyphenated()
188                .encode_lower(&mut uuid::Uuid::encode_buffer())
189                .to_string(),
190        )
191    }
192}
193
194impl std::ops::Deref for SubgraphRequestId {
195    type Target = str;
196
197    fn deref(&self) -> &str {
198        &self.0
199    }
200}
201
202impl Default for SubgraphRequestId {
203    fn default() -> Self {
204        Self::new()
205    }
206}
207
208assert_impl_all!(Response: Send);
209#[derive(Debug)]
210#[non_exhaustive]
211pub struct Response {
212    pub response: http::Response<graphql::Response>,
213    /// Name of the subgraph
214    pub(crate) subgraph_name: String,
215    pub context: Context,
216    /// unique id matching the corresponding field in the request
217    pub(crate) id: SubgraphRequestId,
218}
219
220#[buildstructor::buildstructor]
221impl Response {
222    /// This is the constructor to use when constructing a real Response..
223    ///
224    /// In this case, you already have a valid response and just wish to associate it with a context
225    /// and create a Response.
226    pub(crate) fn new_from_response(
227        response: http::Response<graphql::Response>,
228        context: Context,
229        subgraph_name: String,
230        id: SubgraphRequestId,
231    ) -> Self {
232        Self {
233            response,
234            context,
235            subgraph_name,
236            id,
237        }
238    }
239
240    /// This is the constructor (or builder) to use when constructing a real Response.
241    ///
242    /// The parameters are not optional, because in a live situation all of these properties must be
243    /// set and be correct to create a Response.
244    #[builder(visibility = "pub")]
245    fn new(
246        label: Option<String>,
247        data: Option<Value>,
248        path: Option<Path>,
249        errors: Vec<Error>,
250        extensions: Object,
251        status_code: Option<StatusCode>,
252        context: Context,
253        headers: Option<http::HeaderMap<http::HeaderValue>>,
254        subgraph_name: String,
255        id: Option<SubgraphRequestId>,
256    ) -> Self {
257        // Build a response
258        let res = graphql::Response::builder()
259            .and_label(label)
260            .data(data.unwrap_or_default())
261            .and_path(path)
262            .errors(errors)
263            .extensions(extensions)
264            .build();
265
266        // Build an http Response
267        let mut response = http::Response::builder()
268            .status(status_code.unwrap_or(StatusCode::OK))
269            .body(res)
270            .expect("Response is serializable; qed");
271
272        *response.headers_mut() = headers.unwrap_or_default();
273
274        // Warning: the id argument for this builder is an Option to make that a non breaking change
275        // but this means that if a subgraph response is created explicitly without an id, it will
276        // be generated here and not match the id from the subgraph request
277        let id = id.unwrap_or_default();
278
279        Self {
280            response,
281            context,
282            subgraph_name,
283            id,
284        }
285    }
286
287    /// This is the constructor (or builder) to use when constructing a "fake" Response.
288    ///
289    /// This does not enforce the provision of the data that is required for a fully functional
290    /// Response. It's usually enough for testing, when a fully constructed Response is
291    /// difficult to construct and not required for the purposes of the test.
292    #[builder(visibility = "pub")]
293    fn fake_new(
294        label: Option<String>,
295        data: Option<Value>,
296        path: Option<Path>,
297        errors: Vec<Error>,
298        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
299        extensions: JsonMap<ByteString, Value>,
300        status_code: Option<StatusCode>,
301        context: Option<Context>,
302        headers: Option<http::HeaderMap<http::HeaderValue>>,
303        subgraph_name: Option<String>,
304        id: Option<SubgraphRequestId>,
305    ) -> Self {
306        Self::new(
307            label,
308            data,
309            path,
310            errors,
311            extensions,
312            status_code,
313            context.unwrap_or_default(),
314            headers,
315            subgraph_name.unwrap_or_default(),
316            id,
317        )
318    }
319
320    /// This is the constructor (or builder) to use when constructing a "fake" Response.
321    /// It differs from the existing fake_new because it allows easier passing of headers. However we can't change the original without breaking the public APIs.
322    ///
323    /// This does not enforce the provision of the data that is required for a fully functional
324    /// Response. It's usually enough for testing, when a fully constructed Response is
325    /// difficult to construct and not required for the purposes of the test.
326    #[builder(visibility = "pub")]
327    fn fake2_new(
328        label: Option<String>,
329        data: Option<Value>,
330        path: Option<Path>,
331        errors: Vec<Error>,
332        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
333        extensions: JsonMap<ByteString, Value>,
334        status_code: Option<StatusCode>,
335        context: Option<Context>,
336        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
337        subgraph_name: Option<String>,
338        id: Option<SubgraphRequestId>,
339    ) -> Result<Response, BoxError> {
340        Ok(Self::new(
341            label,
342            data,
343            path,
344            errors,
345            extensions,
346            status_code,
347            context.unwrap_or_default(),
348            Some(header_map(headers)?),
349            subgraph_name.unwrap_or_default(),
350            id,
351        ))
352    }
353
354    /// This is the constructor (or builder) to use when constructing a Response that represents a global error.
355    /// It has no path and no response data.
356    /// This is useful for things such as authentication errors.
357    #[builder(visibility = "pub")]
358    fn error_new(
359        errors: Vec<Error>,
360        status_code: Option<StatusCode>,
361        context: Context,
362        subgraph_name: String,
363        id: Option<SubgraphRequestId>,
364    ) -> Self {
365        Self::new(
366            Default::default(),
367            Default::default(),
368            Default::default(),
369            errors,
370            Default::default(),
371            status_code,
372            context,
373            Default::default(),
374            subgraph_name,
375            id,
376        )
377    }
378}
379
380impl Request {
381    pub(crate) fn to_sha256(&self, ignored_headers: &HashSet<String>) -> String {
382        let mut hasher = Sha256::new();
383        let http_req = &self.subgraph_request;
384        hasher.update(http_req.method().as_str().as_bytes());
385
386        // To not allocate
387        let version = match http_req.version() {
388            Version::HTTP_09 => "HTTP/0.9",
389            Version::HTTP_10 => "HTTP/1.0",
390            Version::HTTP_11 => "HTTP/1.1",
391            Version::HTTP_2 => "HTTP/2.0",
392            Version::HTTP_3 => "HTTP/3.0",
393            _ => "unknown",
394        };
395        hasher.update(version.as_bytes());
396        let uri = http_req.uri();
397        if let Some(scheme) = uri.scheme() {
398            hasher.update(scheme.as_str().as_bytes());
399        }
400        if let Some(authority) = uri.authority() {
401            hasher.update(authority.as_str().as_bytes());
402        }
403        if let Some(query) = uri.query() {
404            hasher.update(query.as_bytes());
405        }
406
407        // this assumes headers are in the same order
408        for (name, value) in http_req
409            .headers()
410            .iter()
411            .filter(|(name, _)| !ignored_headers.contains(name.as_str()))
412        {
413            hasher.update(name.as_str().as_bytes());
414            hasher.update(value.to_str().unwrap_or("ERROR").as_bytes());
415        }
416        if let Some(claim) = self
417            .context
418            .get_json_value(APOLLO_AUTHENTICATION_JWT_CLAIMS)
419        {
420            hasher.update(format!("{claim:?}").as_bytes());
421        }
422        let body = http_req.body();
423        if let Some(operation_name) = &body.operation_name {
424            hasher.update(operation_name.as_bytes());
425        }
426        if let Some(query) = &body.query {
427            hasher.update(query.as_bytes());
428        }
429        for (var_name, var_value) in &body.variables {
430            hasher.update(var_name.inner());
431            hasher.update(var_value.to_bytes());
432        }
433        for (name, val) in &body.extensions {
434            hasher.update(name.inner());
435            hasher.update(val.to_bytes());
436        }
437
438        hex::encode(hasher.finalize())
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445
446    #[test]
447    fn test_subgraph_request_hash() {
448        let subgraph_req_1 = Request::fake_builder()
449            .subgraph_request(
450                http::Request::builder()
451                    .header("public_header", "value")
452                    .header("auth", "my_token")
453                    .body(graphql::Request::default())
454                    .unwrap(),
455            )
456            .build();
457        let subgraph_req_2 = Request::fake_builder()
458            .subgraph_request(
459                http::Request::builder()
460                    .header("public_header", "value_bis")
461                    .header("auth", "my_token")
462                    .body(graphql::Request::default())
463                    .unwrap(),
464            )
465            .build();
466        let mut ignored_headers = HashSet::new();
467        ignored_headers.insert("public_header".to_string());
468        assert_eq!(
469            subgraph_req_1.to_sha256(&ignored_headers),
470            subgraph_req_2.to_sha256(&ignored_headers)
471        );
472
473        let subgraph_req_1 = Request::fake_builder()
474            .subgraph_request(
475                http::Request::builder()
476                    .header("public_header", "value")
477                    .header("auth", "my_token")
478                    .body(graphql::Request::default())
479                    .unwrap(),
480            )
481            .build();
482        let subgraph_req_2 = Request::fake_builder()
483            .subgraph_request(
484                http::Request::builder()
485                    .header("public_header", "value_bis")
486                    .header("auth", "my_token")
487                    .body(graphql::Request::default())
488                    .unwrap(),
489            )
490            .build();
491        let ignored_headers = HashSet::new();
492        assert_ne!(
493            subgraph_req_1.to_sha256(&ignored_headers),
494            subgraph_req_2.to_sha256(&ignored_headers)
495        );
496    }
497}