Skip to main content

apollo_router/services/
external.rs

1//! Structures for externalised data, communicating the state of the router pipeline at the
2//! different stages.
3
4use std::collections::HashMap;
5use std::fmt::Debug;
6use std::sync::Arc;
7use std::time::Duration;
8
9use http::HeaderMap;
10use http::HeaderValue;
11use http::Method;
12use http::StatusCode;
13use http::header::ACCEPT;
14use http::header::CONTENT_TYPE;
15#[cfg(unix)]
16use hyperlocal;
17use opentelemetry::global::get_text_map_propagator;
18use schemars::JsonSchema;
19use serde::Deserialize;
20use serde::Serialize;
21use serde::de::DeserializeOwned;
22use strum::Display;
23use tower::BoxError;
24use tower::Service;
25use tracing::Instrument;
26
27use super::subgraph::SubgraphRequestId;
28use crate::Context;
29use crate::plugins::telemetry::consts::HTTP_REQUEST_SPAN_NAME;
30use crate::plugins::telemetry::otel::OpenTelemetrySpanExt;
31use crate::plugins::telemetry::reload::otel::prepare_context;
32use crate::query_planner::QueryPlan;
33use crate::services::http::HttpRequest;
34use crate::services::http::HttpResponse;
35#[cfg(unix)]
36use crate::services::parse_unix_socket_url;
37use crate::services::router;
38
39pub(crate) const DEFAULT_EXTERNALIZATION_TIMEOUT: Duration = Duration::from_secs(1);
40
41/// Version of our externalised data. Rev this if it changes
42pub(crate) const EXTERNALIZABLE_VERSION: u8 = 1;
43
44#[derive(Clone, Debug, Display, Deserialize, PartialEq, Serialize, JsonSchema)]
45pub(crate) enum PipelineStep {
46    RouterRequest,
47    RouterResponse,
48    SupergraphRequest,
49    SupergraphResponse,
50    ExecutionRequest,
51    ExecutionResponse,
52    SubgraphRequest,
53    SubgraphResponse,
54    ConnectorRequest,
55    ConnectorResponse,
56}
57
58impl From<PipelineStep> for opentelemetry::Value {
59    fn from(val: PipelineStep) -> Self {
60        val.to_string().into()
61    }
62}
63
64#[derive(Clone, Debug, Default, Display, Deserialize, PartialEq, Serialize, JsonSchema)]
65#[serde(rename_all = "camelCase")]
66pub(crate) enum Control {
67    #[default]
68    Continue,
69    Break(u16),
70}
71
72impl Control {
73    #[allow(dead_code)]
74    fn new(status: u16) -> Self {
75        Control::Break(status)
76    }
77
78    pub(crate) fn get_http_status(&self) -> Result<StatusCode, BoxError> {
79        match self {
80            Control::Continue => Ok(StatusCode::OK),
81            Control::Break(code) => StatusCode::from_u16(*code).map_err(|e| e.into()),
82        }
83    }
84}
85
86#[derive(Clone, Debug, Deserialize, Serialize)]
87#[serde(rename_all = "camelCase")]
88pub(crate) struct Externalizable<T> {
89    pub(crate) version: u8,
90    pub(crate) stage: String,
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub(crate) control: Option<Control>,
93    pub(crate) id: Option<String>,
94    #[serde(skip_serializing_if = "Option::is_none")]
95    pub(crate) headers: Option<HashMap<String, Vec<String>>>,
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub(crate) body: Option<T>,
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub(crate) context: Option<Context>,
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub(crate) sdl: Option<String>,
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub(crate) uri: Option<String>,
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub(crate) method: Option<String>,
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub(crate) path: Option<String>,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub(crate) service_name: Option<String>,
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub(crate) status_code: Option<u16>,
112    #[serde(skip_serializing_if = "Option::is_none")]
113    pub(crate) has_next: Option<bool>,
114    #[serde(skip_serializing_if = "Option::is_none")]
115    query_plan: Option<Arc<QueryPlan>>,
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub(crate) subgraph_request_id: Option<SubgraphRequestId>,
118}
119
120#[buildstructor::buildstructor]
121impl<T> Externalizable<T>
122where
123    T: Debug + DeserializeOwned + Serialize + Send + Sync,
124{
125    /// This is the constructor (or builder) to use when constructing a Router
126    /// `Externalizable`.
127    #[builder(visibility = "pub(crate)")]
128    fn router_new(
129        stage: PipelineStep,
130        control: Option<Control>,
131        id: String,
132        headers: Option<HashMap<String, Vec<String>>>,
133        body: Option<T>,
134        context: Option<Context>,
135        status_code: Option<u16>,
136        method: Option<String>,
137        path: Option<String>,
138        sdl: Option<String>,
139    ) -> Self {
140        assert!(matches!(
141            stage,
142            PipelineStep::RouterRequest | PipelineStep::RouterResponse
143        ));
144        Externalizable {
145            version: EXTERNALIZABLE_VERSION,
146            stage: stage.to_string(),
147            control,
148            id: Some(id),
149            headers,
150            body,
151            context,
152            status_code,
153            sdl,
154            uri: None,
155            path,
156            method,
157            service_name: None,
158            has_next: None,
159            query_plan: None,
160            subgraph_request_id: None,
161        }
162    }
163
164    /// This is the constructor (or builder) to use when constructing a Supergraph
165    /// `Externalizable`.
166    #[builder(visibility = "pub(crate)")]
167    fn supergraph_new(
168        stage: PipelineStep,
169        control: Option<Control>,
170        id: String,
171        headers: Option<HashMap<String, Vec<String>>>,
172        body: Option<T>,
173        context: Option<Context>,
174        status_code: Option<u16>,
175        method: Option<String>,
176        sdl: Option<String>,
177        has_next: Option<bool>,
178    ) -> Self {
179        assert!(matches!(
180            stage,
181            PipelineStep::SupergraphRequest | PipelineStep::SupergraphResponse
182        ));
183        Externalizable {
184            version: EXTERNALIZABLE_VERSION,
185            stage: stage.to_string(),
186            control,
187            id: Some(id),
188            headers,
189            body,
190            context,
191            status_code,
192            sdl,
193            uri: None,
194            path: None,
195            method,
196            service_name: None,
197            has_next,
198            query_plan: None,
199            subgraph_request_id: None,
200        }
201    }
202
203    /// This is the constructor (or builder) to use when constructing an Execution
204    /// `Externalizable`.
205    #[builder(visibility = "pub(crate)")]
206    fn execution_new(
207        stage: PipelineStep,
208        control: Option<Control>,
209        id: String,
210        headers: Option<HashMap<String, Vec<String>>>,
211        body: Option<T>,
212        context: Option<Context>,
213        status_code: Option<u16>,
214        method: Option<String>,
215        sdl: Option<String>,
216        has_next: Option<bool>,
217        query_plan: Option<Arc<QueryPlan>>,
218    ) -> Self {
219        assert!(matches!(
220            stage,
221            PipelineStep::ExecutionRequest | PipelineStep::ExecutionResponse
222        ));
223        Externalizable {
224            version: EXTERNALIZABLE_VERSION,
225            stage: stage.to_string(),
226            control,
227            id: Some(id),
228            headers,
229            body,
230            context,
231            status_code,
232            sdl,
233            uri: None,
234            path: None,
235            method,
236            service_name: None,
237            has_next,
238            query_plan,
239            subgraph_request_id: None,
240        }
241    }
242
243    /// This is the constructor (or builder) to use when constructing a Subgraph
244    /// `Externalizable`.
245    #[builder(visibility = "pub(crate)")]
246    fn subgraph_new(
247        stage: PipelineStep,
248        control: Option<Control>,
249        id: String,
250        headers: Option<HashMap<String, Vec<String>>>,
251        body: Option<T>,
252        context: Option<Context>,
253        status_code: Option<u16>,
254        method: Option<String>,
255        service_name: Option<String>,
256        uri: Option<String>,
257        subgraph_request_id: Option<SubgraphRequestId>,
258    ) -> Self {
259        assert!(matches!(
260            stage,
261            PipelineStep::SubgraphRequest | PipelineStep::SubgraphResponse
262        ));
263        Externalizable {
264            version: EXTERNALIZABLE_VERSION,
265            stage: stage.to_string(),
266            control,
267            id: Some(id),
268            headers,
269            body,
270            context,
271            status_code,
272            sdl: None,
273            uri,
274            path: None,
275            method,
276            service_name,
277            has_next: None,
278            query_plan: None,
279            subgraph_request_id,
280        }
281    }
282
283    pub(crate) async fn call<C>(
284        self,
285        mut client: C,
286        uri: &str,
287        context: Context,
288    ) -> Result<Self, BoxError>
289    where
290        C: Service<HttpRequest, Response = HttpResponse, Error = BoxError>
291            + Clone
292            + Send
293            + Sync
294            + 'static,
295    {
296        tracing::debug!("forwarding json: {}", serde_json::to_string(&self)?);
297
298        // Handle Unix socket URL conversion
299        // Standard http::Uri doesn't support unix:// URLs, so we need to convert them
300        // using hyperlocal which encodes the socket path in a way the Unix connector understands
301        #[cfg(unix)]
302        let converted_uri: http::Uri = if let Some(path) = uri.strip_prefix("unix://") {
303            let (socket_path, http_path) = parse_unix_socket_url(path);
304            hyperlocal::Uri::new(socket_path, http_path).into()
305        } else {
306            uri.parse()?
307        };
308
309        #[cfg(not(unix))]
310        let converted_uri: http::Uri = uri.parse()?;
311
312        let mut http_request = http::Request::builder()
313            .uri(converted_uri)
314            .method(Method::POST)
315            .header(ACCEPT, "application/json")
316            .header(CONTENT_TYPE, "application/json")
317            .body(router::body::from_bytes(serde_json::to_vec(&self)?))?;
318
319        let schema_uri = http_request.uri();
320        let host = schema_uri.host().unwrap_or_default();
321        let port = schema_uri.port_u16().unwrap_or_else(|| {
322            let scheme = schema_uri.scheme_str();
323            if scheme == Some("https") {
324                443
325            } else if scheme == Some("http") {
326                80
327            } else {
328                0
329            }
330        });
331
332        let otel_name = format!("POST {uri}");
333        let http_req_span = tracing::info_span!(HTTP_REQUEST_SPAN_NAME,
334            "otel.kind" = "CLIENT",
335            "http.request.method" = "POST",
336            "server.address" = %host,
337            "server.port" = %port,
338            "url.full" = %uri,
339            "otel.name" = %otel_name,
340            "otel.original_name" = "http_request",
341        );
342
343        get_text_map_propagator(|propagator| {
344            propagator.inject_context(
345                &prepare_context(http_req_span.context()),
346                &mut crate::otel_compat::HeaderInjector(http_request.headers_mut()),
347            );
348        });
349
350        let request = HttpRequest {
351            http_request,
352            context,
353        };
354
355        let response = client.call(request).instrument(http_req_span).await?;
356        router::body::into_bytes(response.http_response.into_body())
357            .await
358            .map_err(BoxError::from)
359            .and_then(|bytes| serde_json::from_slice(&bytes).map_err(BoxError::from))
360    }
361
362    /// This is the constructor (or builder) to use when constructing a Connector
363    /// `Externalizable`.
364    #[builder(visibility = "pub(crate)")]
365    fn connector_new(
366        stage: PipelineStep,
367        control: Option<Control>,
368        id: String,
369        headers: Option<HashMap<String, Vec<String>>>,
370        body: Option<T>,
371        context: Option<Context>,
372        status_code: Option<u16>,
373        method: Option<String>,
374        service_name: Option<String>,
375        uri: Option<String>,
376    ) -> Self {
377        assert!(matches!(
378            stage,
379            PipelineStep::ConnectorRequest | PipelineStep::ConnectorResponse
380        ));
381        Externalizable {
382            version: EXTERNALIZABLE_VERSION,
383            stage: stage.to_string(),
384            control,
385            id: Some(id),
386            headers,
387            body,
388            context,
389            status_code,
390            sdl: None,
391            uri,
392            path: None,
393            method,
394            service_name,
395            has_next: None,
396            query_plan: None,
397            subgraph_request_id: None,
398        }
399    }
400}
401
402/// Convert a HeaderMap into a HashMap
403pub(crate) fn externalize_header_map(
404    input: &HeaderMap<HeaderValue>,
405) -> HashMap<String, Vec<String>> {
406    let mut output = HashMap::with_capacity(input.keys_len());
407    for (k, v) in input {
408        let k = k.as_str().to_owned();
409        match String::from_utf8(v.as_bytes().to_vec()) {
410            Ok(v) => output.entry(k).or_insert_with(Vec::new).push(v),
411            Err(e) => tracing::warn!(
412                "unable to convert header value to utf-8 for {}, will not be sent to coprocessor: {}",
413                k,
414                e
415            ),
416        }
417    }
418    output
419}
420
421#[cfg(test)]
422mod test {
423    use http::Response;
424    use tower::service_fn;
425    use tracing_futures::WithSubscriber;
426
427    use super::*;
428    use crate::assert_snapshot_subscriber;
429    use crate::test_harness::tracing_test;
430
431    #[test]
432    fn it_will_build_router_externalizable_correctly() {
433        Externalizable::<String>::router_builder()
434            .stage(PipelineStep::RouterRequest)
435            .id(String::default())
436            .build();
437        Externalizable::<String>::router_builder()
438            .stage(PipelineStep::RouterResponse)
439            .id(String::default())
440            .build();
441    }
442
443    #[test]
444    #[should_panic]
445    fn it_will_not_build_router_externalizable_incorrectly() {
446        Externalizable::<String>::router_builder()
447            .stage(PipelineStep::SubgraphRequest)
448            .id(String::default())
449            .build();
450        Externalizable::<String>::router_builder()
451            .stage(PipelineStep::SubgraphResponse)
452            .id(String::default())
453            .build();
454    }
455
456    #[test]
457    #[should_panic]
458    fn it_will_not_build_router_externalizable_incorrectl_supergraph() {
459        Externalizable::<String>::router_builder()
460            .stage(PipelineStep::SupergraphRequest)
461            .id(String::default())
462            .build();
463        Externalizable::<String>::router_builder()
464            .stage(PipelineStep::SupergraphResponse)
465            .id(String::default())
466            .build();
467    }
468
469    #[test]
470    fn it_will_build_subgraph_externalizable_correctly() {
471        Externalizable::<String>::subgraph_builder()
472            .stage(PipelineStep::SubgraphRequest)
473            .id(String::default())
474            .build();
475        Externalizable::<String>::subgraph_builder()
476            .stage(PipelineStep::SubgraphResponse)
477            .id(String::default())
478            .build();
479    }
480
481    #[test]
482    #[should_panic]
483    fn it_will_not_build_subgraph_externalizable_incorrectly() {
484        Externalizable::<String>::subgraph_builder()
485            .stage(PipelineStep::RouterRequest)
486            .id(String::default())
487            .build();
488        Externalizable::<String>::subgraph_builder()
489            .stage(PipelineStep::RouterResponse)
490            .id(String::default())
491            .build();
492    }
493
494    #[test]
495    fn it_will_externalize_headers_correctly() {
496        let _guard = tracing_test::dispatcher_guard();
497
498        let mut headers = HeaderMap::new();
499        headers.insert("content-type", HeaderValue::from_static("application/json"));
500        // Hyper uses this function internally to create HeaderValue structs
501        headers.insert("x-test-header", unsafe {
502            HeaderValue::from_maybe_shared_unchecked(b"invalid\xc0\xaf")
503        });
504
505        let externalized = externalize_header_map(&headers);
506
507        // x-test-header should be dropped because it is not valid UTF-8
508        assert_eq!(
509            externalized,
510            HashMap::from([(
511                "content-type".to_string(),
512                vec!["application/json".to_string()]
513            )])
514        );
515
516        assert!(tracing_test::logs_contain(
517            "unable to convert header value to utf-8 for x-test-header, will not be sent to coprocessor: invalid utf-8 sequence of 1 bytes from index 7"
518        ));
519    }
520
521    #[tokio::test]
522    async fn it_will_create_an_http_request_span() {
523        use crate::services::http::HttpRequest;
524        use crate::services::http::HttpResponse;
525
526        async {
527            // The mock service emits an event so the snapshot captures the
528            // surrounding http_request span fields set by Externalizable::call.
529            let service = service_fn(|req: HttpRequest| async move {
530                tracing::info!("got request");
531                Ok::<_, BoxError>(HttpResponse {
532                    http_response: Response::builder()
533                        .status(200)
534                        .body(router::body::from_bytes(
535                            serde_json::to_vec(&serde_json::json!({
536                                "version": EXTERNALIZABLE_VERSION,
537                                "stage": "RouterRequest",
538                                "control": "continue",
539                                "id": "test-id",
540                            }))
541                            .unwrap(),
542                        ))
543                        .unwrap(),
544                    context: req.context,
545                })
546            });
547
548            let externalizable = Externalizable::<String>::router_builder()
549                .stage(PipelineStep::RouterRequest)
550                .id("test-id".to_string())
551                .build();
552
553            // call() creates an http_request span with OTel attributes
554            let _ = externalizable
555                .call(service, "http://example.com/test", Context::new())
556                .await;
557        }
558        .with_subscriber(assert_snapshot_subscriber!())
559        .await;
560    }
561}