Skip to main content

apollo_router/services/
subgraph.rs

1#![allow(missing_docs)] // FIXME
2
3use std::fmt::Display;
4use std::pin::Pin;
5use std::sync::Arc;
6
7use apollo_compiler::validation::Valid;
8use http::StatusCode;
9use http::Version;
10use multimap::MultiMap;
11use serde::Deserialize;
12use serde::Serialize;
13use serde_json_bytes::ByteString;
14use serde_json_bytes::Map as JsonMap;
15use serde_json_bytes::Value;
16use sha2::Digest;
17use sha2::Sha256;
18use static_assertions::assert_impl_all;
19use tokio::sync::broadcast;
20use tokio::sync::mpsc;
21use tokio_stream::Stream;
22use tower::BoxError;
23
24use crate::Context;
25use crate::error::Error;
26use crate::graphql;
27use crate::http_ext::TryIntoHeaderName;
28use crate::http_ext::TryIntoHeaderValue;
29use crate::http_ext::header_map;
30use crate::json_ext::Object;
31use crate::json_ext::Path;
32use crate::plugins::authentication::APOLLO_AUTHENTICATION_JWT_CLAIMS;
33use crate::plugins::authorization::CacheKeyMetadata;
34use crate::query_planner::fetch::OperationKind;
35use crate::spec::QueryHash;
36
37pub type BoxService = tower::util::BoxService<Request, Response, BoxError>;
38pub type BoxCloneService = tower::util::BoxCloneService<Request, Response, BoxError>;
39pub type ServiceResult = Result<Response, BoxError>;
40pub(crate) type BoxGqlStream = Pin<Box<dyn Stream<Item = graphql::Response> + Send + Sync>>;
41/// unique id for a subgraph request and the related response
42#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
43pub struct SubgraphRequestId(pub String);
44
45impl Display for SubgraphRequestId {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        write!(f, "{}", self.0)
48    }
49}
50
51assert_impl_all!(Request: Send);
52#[non_exhaustive]
53pub struct Request {
54    /// Original request to the Router.
55    pub supergraph_request: Arc<http::Request<graphql::Request>>,
56
57    pub subgraph_request: http::Request<graphql::Request>,
58
59    pub operation_kind: OperationKind,
60
61    pub context: Context,
62
63    // FIXME for router 2.x
64    /// Name of the subgraph, it's an Option to not introduce breaking change
65    pub(crate) subgraph_name: Option<String>,
66    /// Channel to send the subscription stream to listen on events coming from subgraph in a task
67    pub(crate) subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
68    /// Channel triggered when the client connection has been dropped
69    pub(crate) connection_closed_signal: Option<broadcast::Receiver<()>>,
70
71    pub(crate) query_hash: Arc<QueryHash>,
72
73    // authorization metadata for this request
74    pub(crate) authorization: Arc<CacheKeyMetadata>,
75
76    pub(crate) executable_document: Option<Arc<Valid<apollo_compiler::ExecutableDocument>>>,
77
78    /// unique id for this request
79    pub(crate) id: SubgraphRequestId,
80}
81
82#[buildstructor::buildstructor]
83impl Request {
84    /// This is the constructor (or builder) to use when constructing a real Request.
85    ///
86    /// Required parameters are required in non-testing code to create a Request.
87    #[builder(visibility = "pub")]
88    fn new(
89        supergraph_request: Arc<http::Request<graphql::Request>>,
90        subgraph_request: http::Request<graphql::Request>,
91        operation_kind: OperationKind,
92        context: Context,
93        subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
94        subgraph_name: Option<String>,
95        connection_closed_signal: Option<broadcast::Receiver<()>>,
96    ) -> Request {
97        Self {
98            supergraph_request,
99            subgraph_request,
100            operation_kind,
101            context,
102            subgraph_name,
103            subscription_stream,
104            connection_closed_signal,
105            // It's NOT GREAT! to have an empty hash value here.
106            // This value is populated based on the subgraph query hash in the query planner code.
107            // At the time of writing it's in `crate::query_planner::fetch::FetchNode::fetch_node`.
108            query_hash: QueryHash::default().into(),
109            authorization: Default::default(),
110            executable_document: None,
111            id: SubgraphRequestId::new(),
112        }
113    }
114
115    /// This is the constructor (or builder) to use when constructing a "fake" Request.
116    ///
117    /// This does not enforce the provision of the data that is required for a fully functional
118    /// Request. It's usually enough for testing, when a fully consructed Request is
119    /// difficult to construct and not required for the pusposes of the test.
120    #[builder(visibility = "pub")]
121    fn fake_new(
122        supergraph_request: Option<Arc<http::Request<graphql::Request>>>,
123        subgraph_request: Option<http::Request<graphql::Request>>,
124        operation_kind: Option<OperationKind>,
125        context: Option<Context>,
126        subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
127        subgraph_name: Option<String>,
128        connection_closed_signal: Option<broadcast::Receiver<()>>,
129    ) -> Request {
130        Request::new(
131            supergraph_request.unwrap_or_default(),
132            subgraph_request.unwrap_or_default(),
133            operation_kind.unwrap_or(OperationKind::Query),
134            context.unwrap_or_default(),
135            subscription_stream,
136            subgraph_name,
137            connection_closed_signal,
138        )
139    }
140}
141
142impl Clone for Request {
143    fn clone(&self) -> Self {
144        // http::Request is not clonable so we have to rebuild a new one
145        // we don't use the extensions field for now
146        let mut builder = http::Request::builder()
147            .method(self.subgraph_request.method())
148            .version(self.subgraph_request.version())
149            .uri(self.subgraph_request.uri());
150
151        {
152            let headers = builder.headers_mut().unwrap();
153            headers.extend(
154                self.subgraph_request
155                    .headers()
156                    .iter()
157                    .map(|(name, value)| (name.clone(), value.clone())),
158            );
159        }
160        let subgraph_request = builder.body(self.subgraph_request.body().clone()).unwrap();
161
162        Self {
163            supergraph_request: self.supergraph_request.clone(),
164            subgraph_request,
165            operation_kind: self.operation_kind,
166            context: self.context.clone(),
167            subgraph_name: self.subgraph_name.clone(),
168            subscription_stream: self.subscription_stream.clone(),
169            connection_closed_signal: self
170                .connection_closed_signal
171                .as_ref()
172                .map(|s| s.resubscribe()),
173            query_hash: self.query_hash.clone(),
174            authorization: self.authorization.clone(),
175            executable_document: self.executable_document.clone(),
176            id: self.id.clone(),
177        }
178    }
179}
180
181impl SubgraphRequestId {
182    pub fn new() -> Self {
183        SubgraphRequestId(
184            uuid::Uuid::new_v4()
185                .as_hyphenated()
186                .encode_lower(&mut uuid::Uuid::encode_buffer())
187                .to_string(),
188        )
189    }
190}
191
192impl std::ops::Deref for SubgraphRequestId {
193    type Target = str;
194
195    fn deref(&self) -> &str {
196        &self.0
197    }
198}
199
200impl Default for SubgraphRequestId {
201    fn default() -> Self {
202        Self::new()
203    }
204}
205
206assert_impl_all!(Response: Send);
207#[derive(Debug)]
208#[non_exhaustive]
209pub struct Response {
210    pub response: http::Response<graphql::Response>,
211    // FIXME for router 2.x
212    /// Name of the subgraph, it's an Option to not introduce breaking change
213    pub(crate) subgraph_name: Option<String>,
214    pub context: Context,
215    /// unique id matching the corresponding field in the request
216    pub(crate) id: SubgraphRequestId,
217}
218
219#[buildstructor::buildstructor]
220impl Response {
221    /// This is the constructor to use when constructing a real Response..
222    ///
223    /// In this case, you already have a valid response and just wish to associate it with a context
224    /// and create a Response.
225    pub(crate) fn new_from_response(
226        response: http::Response<graphql::Response>,
227        context: Context,
228        subgraph_name: String,
229        id: SubgraphRequestId,
230    ) -> Response {
231        Self {
232            response,
233            context,
234            subgraph_name: Some(subgraph_name),
235            id,
236        }
237    }
238
239    /// This is the constructor (or builder) to use when constructing a real Response.
240    ///
241    /// The parameters are not optional, because in a live situation all of these properties must be
242    /// set and be correct to create a Response.
243    #[builder(visibility = "pub")]
244    fn new(
245        label: Option<String>,
246        data: Option<Value>,
247        path: Option<Path>,
248        errors: Vec<Error>,
249        extensions: Object,
250        status_code: Option<StatusCode>,
251        context: Context,
252        headers: Option<http::HeaderMap<http::HeaderValue>>,
253        subgraph_name: Option<String>,
254        id: Option<SubgraphRequestId>,
255    ) -> Response {
256        // Build a response
257        let res = graphql::Response::builder()
258            .and_label(label)
259            .data(data.unwrap_or_default())
260            .and_path(path)
261            .errors(errors)
262            .extensions(extensions)
263            .build();
264
265        // Build an http Response
266        let mut response = http::Response::builder()
267            .status(status_code.unwrap_or(StatusCode::OK))
268            .body(res)
269            .expect("Response is serializable; qed");
270
271        *response.headers_mut() = headers.unwrap_or_default();
272
273        // Warning: the id argument for this builder is an Option to make that a non breaking change
274        // but this means that if a subgraph response is created explicitely without an id, it will
275        // be generated here and not match the id from the subgraph request
276        let id = id.unwrap_or_default();
277
278        Self {
279            response,
280            context,
281            subgraph_name,
282            id,
283        }
284    }
285
286    /// This is the constructor (or builder) to use when constructing a "fake" Response.
287    ///
288    /// This does not enforce the provision of the data that is required for a fully functional
289    /// Response. It's usually enough for testing, when a fully constructed Response is
290    /// difficult to construct and not required for the purposes of the test.
291    #[builder(visibility = "pub")]
292    fn fake_new(
293        label: Option<String>,
294        data: Option<Value>,
295        path: Option<Path>,
296        errors: Vec<Error>,
297        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
298        extensions: JsonMap<ByteString, Value>,
299        status_code: Option<StatusCode>,
300        context: Option<Context>,
301        headers: Option<http::HeaderMap<http::HeaderValue>>,
302        subgraph_name: Option<String>,
303        id: Option<SubgraphRequestId>,
304    ) -> Response {
305        Response::new(
306            label,
307            data,
308            path,
309            errors,
310            extensions,
311            status_code,
312            context.unwrap_or_default(),
313            headers,
314            subgraph_name,
315            id,
316        )
317    }
318
319    /// This is the constructor (or builder) to use when constructing a "fake" Response.
320    /// It differs from the existing fake_new because it allows easier passing of headers. However we can't change the original without breaking the public APIs.
321    ///
322    /// This does not enforce the provision of the data that is required for a fully functional
323    /// Response. It's usually enough for testing, when a fully constructed Response is
324    /// difficult to construct and not required for the purposes of the test.
325    #[builder(visibility = "pub")]
326    fn fake2_new(
327        label: Option<String>,
328        data: Option<Value>,
329        path: Option<Path>,
330        errors: Vec<Error>,
331        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
332        extensions: JsonMap<ByteString, Value>,
333        status_code: Option<StatusCode>,
334        context: Option<Context>,
335        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
336        subgraph_name: Option<String>,
337        id: Option<SubgraphRequestId>,
338    ) -> Result<Response, BoxError> {
339        Ok(Response::new(
340            label,
341            data,
342            path,
343            errors,
344            extensions,
345            status_code,
346            context.unwrap_or_default(),
347            Some(header_map(headers)?),
348            subgraph_name,
349            id,
350        ))
351    }
352
353    /// This is the constructor (or builder) to use when constructing a Response that represents a global error.
354    /// It has no path and no response data.
355    /// This is useful for things such as authentication errors.
356    #[builder(visibility = "pub")]
357    fn error_new(
358        errors: Vec<Error>,
359        status_code: Option<StatusCode>,
360        context: Context,
361        subgraph_name: Option<String>,
362        id: Option<SubgraphRequestId>,
363    ) -> Result<Response, BoxError> {
364        Ok(Response::new(
365            Default::default(),
366            Default::default(),
367            Default::default(),
368            errors,
369            Default::default(),
370            status_code,
371            context,
372            Default::default(),
373            subgraph_name,
374            id,
375        ))
376    }
377}
378
379impl Request {
380    #[allow(dead_code)]
381    pub(crate) fn to_sha256(&self) -> String {
382        let mut hasher = Sha256::new();
383        let http_req = &self.subgraph_request;
384        hasher.update(http_req.method().as_str().as_bytes());
385
386        // To not allocate
387        let version = match http_req.version() {
388            Version::HTTP_09 => "HTTP/0.9",
389            Version::HTTP_10 => "HTTP/1.0",
390            Version::HTTP_11 => "HTTP/1.1",
391            Version::HTTP_2 => "HTTP/2.0",
392            Version::HTTP_3 => "HTTP/3.0",
393            _ => "unknown",
394        };
395        hasher.update(version.as_bytes());
396        let uri = http_req.uri();
397        if let Some(scheme) = uri.scheme() {
398            hasher.update(scheme.as_str().as_bytes());
399        }
400        if let Some(authority) = uri.authority() {
401            hasher.update(authority.as_str().as_bytes());
402        }
403        if let Some(query) = uri.query() {
404            hasher.update(query.as_bytes());
405        }
406
407        // this assumes headers are in the same order
408        for (name, value) in http_req.headers() {
409            hasher.update(name.as_str().as_bytes());
410            hasher.update(value.to_str().unwrap_or("ERROR").as_bytes());
411        }
412        if let Some(claim) = self
413            .context
414            .get_json_value(APOLLO_AUTHENTICATION_JWT_CLAIMS)
415        {
416            hasher.update(format!("{claim:?}").as_bytes());
417        }
418        let body = http_req.body();
419        if let Some(operation_name) = &body.operation_name {
420            hasher.update(operation_name.as_bytes());
421        }
422        if let Some(query) = &body.query {
423            hasher.update(query.as_bytes());
424        }
425        for (var_name, var_value) in &body.variables {
426            hasher.update(var_name.inner());
427            // TODO implement to_bytes() for value in serde_json_bytes
428            hasher.update(var_value.to_string().as_bytes());
429        }
430        for (name, val) in &body.extensions {
431            hasher.update(name.inner());
432            // TODO implement to_bytes() for value in serde_json_bytes
433            hasher.update(val.to_string().as_bytes());
434        }
435
436        hex::encode(hasher.finalize())
437    }
438}