apollo_router/services/
external.rs

1//! Structures for externalised data, communicating the state of the router pipeline at the
2//! different stages.
3
4use std::collections::HashMap;
5use std::fmt::Debug;
6use std::sync::Arc;
7use std::time::Duration;
8
9use http::HeaderMap;
10use http::HeaderValue;
11use http::Method;
12use http::StatusCode;
13use http::header::ACCEPT;
14use http::header::CONTENT_TYPE;
15use opentelemetry::global::get_text_map_propagator;
16use schemars::JsonSchema;
17use serde::Deserialize;
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use strum_macros::Display;
21use tower::BoxError;
22use tower::Service;
23use tracing::Instrument;
24
25use super::subgraph::SubgraphRequestId;
26use crate::Context;
27use crate::plugins::telemetry::consts::HTTP_REQUEST_SPAN_NAME;
28use crate::plugins::telemetry::otel::OpenTelemetrySpanExt;
29use crate::plugins::telemetry::reload::prepare_context;
30use crate::query_planner::QueryPlan;
31use crate::services::router;
32use crate::services::router::body::RouterBody;
33
34pub(crate) const DEFAULT_EXTERNALIZATION_TIMEOUT: Duration = Duration::from_secs(1);
35
36/// Version of our externalised data. Rev this if it changes
37pub(crate) const EXTERNALIZABLE_VERSION: u8 = 1;
38
39#[derive(Clone, Debug, Display, Deserialize, PartialEq, Serialize, JsonSchema)]
40pub(crate) enum PipelineStep {
41    RouterRequest,
42    RouterResponse,
43    SupergraphRequest,
44    SupergraphResponse,
45    ExecutionRequest,
46    ExecutionResponse,
47    SubgraphRequest,
48    SubgraphResponse,
49}
50
51impl From<PipelineStep> for opentelemetry::Value {
52    fn from(val: PipelineStep) -> Self {
53        val.to_string().into()
54    }
55}
56
57#[derive(Clone, Debug, Default, Display, Deserialize, PartialEq, Serialize, JsonSchema)]
58#[serde(rename_all = "camelCase")]
59pub(crate) enum Control {
60    #[default]
61    Continue,
62    Break(u16),
63}
64
65impl Control {
66    #[allow(dead_code)]
67    fn new(status: u16) -> Self {
68        Control::Break(status)
69    }
70
71    pub(crate) fn get_http_status(&self) -> Result<StatusCode, BoxError> {
72        match self {
73            Control::Continue => Ok(StatusCode::OK),
74            Control::Break(code) => StatusCode::from_u16(*code).map_err(|e| e.into()),
75        }
76    }
77}
78
79#[derive(Clone, Debug, Deserialize, Serialize)]
80#[serde(rename_all = "camelCase")]
81pub(crate) struct Externalizable<T> {
82    pub(crate) version: u8,
83    pub(crate) stage: String,
84    #[serde(skip_serializing_if = "Option::is_none")]
85    pub(crate) control: Option<Control>,
86    pub(crate) id: Option<String>,
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub(crate) headers: Option<HashMap<String, Vec<String>>>,
89    #[serde(skip_serializing_if = "Option::is_none")]
90    pub(crate) body: Option<T>,
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub(crate) context: Option<Context>,
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub(crate) sdl: Option<String>,
95    #[serde(skip_serializing_if = "Option::is_none")]
96    pub(crate) uri: Option<String>,
97    #[serde(skip_serializing_if = "Option::is_none")]
98    pub(crate) method: Option<String>,
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub(crate) path: Option<String>,
101    #[serde(skip_serializing_if = "Option::is_none")]
102    pub(crate) service_name: Option<String>,
103    #[serde(skip_serializing_if = "Option::is_none")]
104    pub(crate) status_code: Option<u16>,
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub(crate) has_next: Option<bool>,
107    #[serde(skip_serializing_if = "Option::is_none")]
108    query_plan: Option<Arc<QueryPlan>>,
109    #[serde(skip_serializing_if = "Option::is_none")]
110    pub(crate) subgraph_request_id: Option<SubgraphRequestId>,
111}
112
113#[buildstructor::buildstructor]
114impl<T> Externalizable<T>
115where
116    T: Debug + DeserializeOwned + Serialize + Send + Sync,
117{
118    /// This is the constructor (or builder) to use when constructing a Router
119    /// `Externalizable`.
120    #[builder(visibility = "pub(crate)")]
121    fn router_new(
122        stage: PipelineStep,
123        control: Option<Control>,
124        id: String,
125        headers: Option<HashMap<String, Vec<String>>>,
126        body: Option<T>,
127        context: Option<Context>,
128        status_code: Option<u16>,
129        method: Option<String>,
130        path: Option<String>,
131        sdl: Option<String>,
132    ) -> Self {
133        assert!(matches!(
134            stage,
135            PipelineStep::RouterRequest | PipelineStep::RouterResponse
136        ));
137        Externalizable {
138            version: EXTERNALIZABLE_VERSION,
139            stage: stage.to_string(),
140            control,
141            id: Some(id),
142            headers,
143            body,
144            context,
145            status_code,
146            sdl,
147            uri: None,
148            path,
149            method,
150            service_name: None,
151            has_next: None,
152            query_plan: None,
153            subgraph_request_id: None,
154        }
155    }
156
157    /// This is the constructor (or builder) to use when constructing a Supergraph
158    /// `Externalizable`.
159    #[builder(visibility = "pub(crate)")]
160    fn supergraph_new(
161        stage: PipelineStep,
162        control: Option<Control>,
163        id: String,
164        headers: Option<HashMap<String, Vec<String>>>,
165        body: Option<T>,
166        context: Option<Context>,
167        status_code: Option<u16>,
168        method: Option<String>,
169        sdl: Option<String>,
170        has_next: Option<bool>,
171    ) -> Self {
172        assert!(matches!(
173            stage,
174            PipelineStep::SupergraphRequest | PipelineStep::SupergraphResponse
175        ));
176        Externalizable {
177            version: EXTERNALIZABLE_VERSION,
178            stage: stage.to_string(),
179            control,
180            id: Some(id),
181            headers,
182            body,
183            context,
184            status_code,
185            sdl,
186            uri: None,
187            path: None,
188            method,
189            service_name: None,
190            has_next,
191            query_plan: None,
192            subgraph_request_id: None,
193        }
194    }
195
196    /// This is the constructor (or builder) to use when constructing an Execution
197    /// `Externalizable`.
198    #[builder(visibility = "pub(crate)")]
199    fn execution_new(
200        stage: PipelineStep,
201        control: Option<Control>,
202        id: String,
203        headers: Option<HashMap<String, Vec<String>>>,
204        body: Option<T>,
205        context: Option<Context>,
206        status_code: Option<u16>,
207        method: Option<String>,
208        sdl: Option<String>,
209        has_next: Option<bool>,
210        query_plan: Option<Arc<QueryPlan>>,
211    ) -> Self {
212        assert!(matches!(
213            stage,
214            PipelineStep::ExecutionRequest | PipelineStep::ExecutionResponse
215        ));
216        Externalizable {
217            version: EXTERNALIZABLE_VERSION,
218            stage: stage.to_string(),
219            control,
220            id: Some(id),
221            headers,
222            body,
223            context,
224            status_code,
225            sdl,
226            uri: None,
227            path: None,
228            method,
229            service_name: None,
230            has_next,
231            query_plan,
232            subgraph_request_id: None,
233        }
234    }
235
236    /// This is the constructor (or builder) to use when constructing a Subgraph
237    /// `Externalizable`.
238    #[builder(visibility = "pub(crate)")]
239    fn subgraph_new(
240        stage: PipelineStep,
241        control: Option<Control>,
242        id: String,
243        headers: Option<HashMap<String, Vec<String>>>,
244        body: Option<T>,
245        context: Option<Context>,
246        status_code: Option<u16>,
247        method: Option<String>,
248        service_name: Option<String>,
249        uri: Option<String>,
250        subgraph_request_id: Option<SubgraphRequestId>,
251    ) -> Self {
252        assert!(matches!(
253            stage,
254            PipelineStep::SubgraphRequest | PipelineStep::SubgraphResponse
255        ));
256        Externalizable {
257            version: EXTERNALIZABLE_VERSION,
258            stage: stage.to_string(),
259            control,
260            id: Some(id),
261            headers,
262            body,
263            context,
264            status_code,
265            sdl: None,
266            uri,
267            path: None,
268            method,
269            service_name,
270            has_next: None,
271            query_plan: None,
272            subgraph_request_id,
273        }
274    }
275
276    pub(crate) async fn call<C>(self, mut client: C, uri: &str) -> Result<Self, BoxError>
277    where
278        C: Service<
279                http::Request<RouterBody>,
280                Response = http::Response<RouterBody>,
281                Error = BoxError,
282            > + Clone
283            + Send
284            + Sync
285            + 'static,
286    {
287        tracing::debug!("forwarding json: {}", serde_json::to_string(&self)?);
288
289        let mut request = http::Request::builder()
290            .uri(uri)
291            .method(Method::POST)
292            .header(ACCEPT, "application/json")
293            .header(CONTENT_TYPE, "application/json")
294            .body(router::body::from_bytes(serde_json::to_vec(&self)?))?;
295
296        let schema_uri = request.uri();
297        let host = schema_uri.host().unwrap_or_default();
298        let port = schema_uri.port_u16().unwrap_or_else(|| {
299            let scheme = schema_uri.scheme_str();
300            if scheme == Some("https") {
301                443
302            } else if scheme == Some("http") {
303                80
304            } else {
305                0
306            }
307        });
308        let otel_name = format!("POST {schema_uri}");
309
310        let http_req_span = tracing::info_span!(HTTP_REQUEST_SPAN_NAME,
311            "otel.kind" = "CLIENT",
312            "http.request.method" = "POST",
313            "server.address" = %host,
314            "server.port" = %port,
315            "url.full" = %schema_uri,
316            "otel.name" = %otel_name,
317            "otel.original_name" = "http_request",
318        );
319
320        get_text_map_propagator(|propagator| {
321            propagator.inject_context(
322                &prepare_context(http_req_span.context()),
323                &mut crate::otel_compat::HeaderInjector(request.headers_mut()),
324            );
325        });
326
327        let response = client.call(request).instrument(http_req_span).await?;
328        router::body::into_bytes(response.into_body())
329            .await
330            .map_err(BoxError::from)
331            .and_then(|bytes| serde_json::from_slice(&bytes).map_err(BoxError::from))
332    }
333}
334
335/// Convert a HeaderMap into a HashMap
336pub(crate) fn externalize_header_map(
337    input: &HeaderMap<HeaderValue>,
338) -> Result<HashMap<String, Vec<String>>, BoxError> {
339    let mut output = HashMap::new();
340    for (k, v) in input {
341        let k = k.as_str().to_owned();
342        let v = String::from_utf8(v.as_bytes().to_vec()).map_err(|e| e.to_string())?;
343        output.entry(k).or_insert_with(Vec::new).push(v)
344    }
345    Ok(output)
346}
347
348#[cfg(test)]
349mod test {
350    use http::Response;
351    use tower::service_fn;
352    use tracing_futures::WithSubscriber;
353
354    use super::*;
355    use crate::assert_snapshot_subscriber;
356
357    #[test]
358    fn it_will_build_router_externalizable_correctly() {
359        Externalizable::<String>::router_builder()
360            .stage(PipelineStep::RouterRequest)
361            .id(String::default())
362            .build();
363        Externalizable::<String>::router_builder()
364            .stage(PipelineStep::RouterResponse)
365            .id(String::default())
366            .build();
367    }
368
369    #[test]
370    #[should_panic]
371    fn it_will_not_build_router_externalizable_incorrectly() {
372        Externalizable::<String>::router_builder()
373            .stage(PipelineStep::SubgraphRequest)
374            .id(String::default())
375            .build();
376        Externalizable::<String>::router_builder()
377            .stage(PipelineStep::SubgraphResponse)
378            .id(String::default())
379            .build();
380    }
381
382    #[test]
383    #[should_panic]
384    fn it_will_not_build_router_externalizable_incorrectl_supergraph() {
385        Externalizable::<String>::router_builder()
386            .stage(PipelineStep::SupergraphRequest)
387            .id(String::default())
388            .build();
389        Externalizable::<String>::router_builder()
390            .stage(PipelineStep::SupergraphResponse)
391            .id(String::default())
392            .build();
393    }
394
395    #[test]
396    fn it_will_build_subgraph_externalizable_correctly() {
397        Externalizable::<String>::subgraph_builder()
398            .stage(PipelineStep::SubgraphRequest)
399            .id(String::default())
400            .build();
401        Externalizable::<String>::subgraph_builder()
402            .stage(PipelineStep::SubgraphResponse)
403            .id(String::default())
404            .build();
405    }
406
407    #[test]
408    #[should_panic]
409    fn it_will_not_build_subgraph_externalizable_incorrectly() {
410        Externalizable::<String>::subgraph_builder()
411            .stage(PipelineStep::RouterRequest)
412            .id(String::default())
413            .build();
414        Externalizable::<String>::subgraph_builder()
415            .stage(PipelineStep::RouterResponse)
416            .id(String::default())
417            .build();
418    }
419
420    #[tokio::test]
421    async fn it_will_create_an_http_request_span() {
422        async {
423            // Create a mock service that returns a simple response
424            let service = service_fn(|_req: http::Request<RouterBody>| async {
425                tracing::info!("got request");
426                Ok::<_, BoxError>(
427                    Response::builder()
428                        .status(200)
429                        .body(router::body::from_bytes(vec![]))
430                        .unwrap(),
431                )
432            });
433
434            // Create an externalizable request
435            let externalizable = Externalizable::<String>::router_builder()
436                .stage(PipelineStep::RouterRequest)
437                .id("test-id".to_string())
438                .build();
439
440            // Make the call which should create the HTTP request span
441            let _ = externalizable
442                .call(service, "http://example.com/test")
443                .await;
444        }
445        .with_subscriber(assert_snapshot_subscriber!())
446        .await;
447    }
448}