1use std::collections::HashMap;
5use std::fmt::Debug;
6use std::sync::Arc;
7use std::time::Duration;
8
9use http::HeaderMap;
10use http::HeaderValue;
11use http::Method;
12use http::StatusCode;
13use http::header::ACCEPT;
14use http::header::CONTENT_TYPE;
15use opentelemetry::global::get_text_map_propagator;
16use schemars::JsonSchema;
17use serde::Deserialize;
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use strum_macros::Display;
21use tower::BoxError;
22use tower::Service;
23use tracing::Instrument;
24
25use super::subgraph::SubgraphRequestId;
26use crate::Context;
27use crate::plugins::telemetry::consts::HTTP_REQUEST_SPAN_NAME;
28use crate::plugins::telemetry::otel::OpenTelemetrySpanExt;
29use crate::plugins::telemetry::reload::prepare_context;
30use crate::query_planner::QueryPlan;
31use crate::services::router;
32use crate::services::router::body::RouterBody;
33
34pub(crate) const DEFAULT_EXTERNALIZATION_TIMEOUT: Duration = Duration::from_secs(1);
35
36pub(crate) const EXTERNALIZABLE_VERSION: u8 = 1;
38
39#[derive(Clone, Debug, Display, Deserialize, PartialEq, Serialize, JsonSchema)]
40pub(crate) enum PipelineStep {
41 RouterRequest,
42 RouterResponse,
43 SupergraphRequest,
44 SupergraphResponse,
45 ExecutionRequest,
46 ExecutionResponse,
47 SubgraphRequest,
48 SubgraphResponse,
49}
50
51impl From<PipelineStep> for opentelemetry::Value {
52 fn from(val: PipelineStep) -> Self {
53 val.to_string().into()
54 }
55}
56
57#[derive(Clone, Debug, Default, Display, Deserialize, PartialEq, Serialize, JsonSchema)]
58#[serde(rename_all = "camelCase")]
59pub(crate) enum Control {
60 #[default]
61 Continue,
62 Break(u16),
63}
64
65impl Control {
66 #[allow(dead_code)]
67 fn new(status: u16) -> Self {
68 Control::Break(status)
69 }
70
71 pub(crate) fn get_http_status(&self) -> Result<StatusCode, BoxError> {
72 match self {
73 Control::Continue => Ok(StatusCode::OK),
74 Control::Break(code) => StatusCode::from_u16(*code).map_err(|e| e.into()),
75 }
76 }
77}
78
79#[derive(Clone, Debug, Deserialize, Serialize)]
80#[serde(rename_all = "camelCase")]
81pub(crate) struct Externalizable<T> {
82 pub(crate) version: u8,
83 pub(crate) stage: String,
84 #[serde(skip_serializing_if = "Option::is_none")]
85 pub(crate) control: Option<Control>,
86 pub(crate) id: Option<String>,
87 #[serde(skip_serializing_if = "Option::is_none")]
88 pub(crate) headers: Option<HashMap<String, Vec<String>>>,
89 #[serde(skip_serializing_if = "Option::is_none")]
90 pub(crate) body: Option<T>,
91 #[serde(skip_serializing_if = "Option::is_none")]
92 pub(crate) context: Option<Context>,
93 #[serde(skip_serializing_if = "Option::is_none")]
94 pub(crate) sdl: Option<String>,
95 #[serde(skip_serializing_if = "Option::is_none")]
96 pub(crate) uri: Option<String>,
97 #[serde(skip_serializing_if = "Option::is_none")]
98 pub(crate) method: Option<String>,
99 #[serde(skip_serializing_if = "Option::is_none")]
100 pub(crate) path: Option<String>,
101 #[serde(skip_serializing_if = "Option::is_none")]
102 pub(crate) service_name: Option<String>,
103 #[serde(skip_serializing_if = "Option::is_none")]
104 pub(crate) status_code: Option<u16>,
105 #[serde(skip_serializing_if = "Option::is_none")]
106 pub(crate) has_next: Option<bool>,
107 #[serde(skip_serializing_if = "Option::is_none")]
108 query_plan: Option<Arc<QueryPlan>>,
109 #[serde(skip_serializing_if = "Option::is_none")]
110 pub(crate) subgraph_request_id: Option<SubgraphRequestId>,
111}
112
113#[buildstructor::buildstructor]
114impl<T> Externalizable<T>
115where
116 T: Debug + DeserializeOwned + Serialize + Send + Sync,
117{
118 #[builder(visibility = "pub(crate)")]
121 fn router_new(
122 stage: PipelineStep,
123 control: Option<Control>,
124 id: String,
125 headers: Option<HashMap<String, Vec<String>>>,
126 body: Option<T>,
127 context: Option<Context>,
128 status_code: Option<u16>,
129 method: Option<String>,
130 path: Option<String>,
131 sdl: Option<String>,
132 ) -> Self {
133 assert!(matches!(
134 stage,
135 PipelineStep::RouterRequest | PipelineStep::RouterResponse
136 ));
137 Externalizable {
138 version: EXTERNALIZABLE_VERSION,
139 stage: stage.to_string(),
140 control,
141 id: Some(id),
142 headers,
143 body,
144 context,
145 status_code,
146 sdl,
147 uri: None,
148 path,
149 method,
150 service_name: None,
151 has_next: None,
152 query_plan: None,
153 subgraph_request_id: None,
154 }
155 }
156
157 #[builder(visibility = "pub(crate)")]
160 fn supergraph_new(
161 stage: PipelineStep,
162 control: Option<Control>,
163 id: String,
164 headers: Option<HashMap<String, Vec<String>>>,
165 body: Option<T>,
166 context: Option<Context>,
167 status_code: Option<u16>,
168 method: Option<String>,
169 sdl: Option<String>,
170 has_next: Option<bool>,
171 ) -> Self {
172 assert!(matches!(
173 stage,
174 PipelineStep::SupergraphRequest | PipelineStep::SupergraphResponse
175 ));
176 Externalizable {
177 version: EXTERNALIZABLE_VERSION,
178 stage: stage.to_string(),
179 control,
180 id: Some(id),
181 headers,
182 body,
183 context,
184 status_code,
185 sdl,
186 uri: None,
187 path: None,
188 method,
189 service_name: None,
190 has_next,
191 query_plan: None,
192 subgraph_request_id: None,
193 }
194 }
195
196 #[builder(visibility = "pub(crate)")]
199 fn execution_new(
200 stage: PipelineStep,
201 control: Option<Control>,
202 id: String,
203 headers: Option<HashMap<String, Vec<String>>>,
204 body: Option<T>,
205 context: Option<Context>,
206 status_code: Option<u16>,
207 method: Option<String>,
208 sdl: Option<String>,
209 has_next: Option<bool>,
210 query_plan: Option<Arc<QueryPlan>>,
211 ) -> Self {
212 assert!(matches!(
213 stage,
214 PipelineStep::ExecutionRequest | PipelineStep::ExecutionResponse
215 ));
216 Externalizable {
217 version: EXTERNALIZABLE_VERSION,
218 stage: stage.to_string(),
219 control,
220 id: Some(id),
221 headers,
222 body,
223 context,
224 status_code,
225 sdl,
226 uri: None,
227 path: None,
228 method,
229 service_name: None,
230 has_next,
231 query_plan,
232 subgraph_request_id: None,
233 }
234 }
235
236 #[builder(visibility = "pub(crate)")]
239 fn subgraph_new(
240 stage: PipelineStep,
241 control: Option<Control>,
242 id: String,
243 headers: Option<HashMap<String, Vec<String>>>,
244 body: Option<T>,
245 context: Option<Context>,
246 status_code: Option<u16>,
247 method: Option<String>,
248 service_name: Option<String>,
249 uri: Option<String>,
250 subgraph_request_id: Option<SubgraphRequestId>,
251 ) -> Self {
252 assert!(matches!(
253 stage,
254 PipelineStep::SubgraphRequest | PipelineStep::SubgraphResponse
255 ));
256 Externalizable {
257 version: EXTERNALIZABLE_VERSION,
258 stage: stage.to_string(),
259 control,
260 id: Some(id),
261 headers,
262 body,
263 context,
264 status_code,
265 sdl: None,
266 uri,
267 path: None,
268 method,
269 service_name,
270 has_next: None,
271 query_plan: None,
272 subgraph_request_id,
273 }
274 }
275
276 pub(crate) async fn call<C>(self, mut client: C, uri: &str) -> Result<Self, BoxError>
277 where
278 C: Service<
279 http::Request<RouterBody>,
280 Response = http::Response<RouterBody>,
281 Error = BoxError,
282 > + Clone
283 + Send
284 + Sync
285 + 'static,
286 {
287 tracing::debug!("forwarding json: {}", serde_json::to_string(&self)?);
288
289 let mut request = http::Request::builder()
290 .uri(uri)
291 .method(Method::POST)
292 .header(ACCEPT, "application/json")
293 .header(CONTENT_TYPE, "application/json")
294 .body(router::body::from_bytes(serde_json::to_vec(&self)?))?;
295
296 let schema_uri = request.uri();
297 let host = schema_uri.host().unwrap_or_default();
298 let port = schema_uri.port_u16().unwrap_or_else(|| {
299 let scheme = schema_uri.scheme_str();
300 if scheme == Some("https") {
301 443
302 } else if scheme == Some("http") {
303 80
304 } else {
305 0
306 }
307 });
308 let otel_name = format!("POST {schema_uri}");
309
310 let http_req_span = tracing::info_span!(HTTP_REQUEST_SPAN_NAME,
311 "otel.kind" = "CLIENT",
312 "http.request.method" = "POST",
313 "server.address" = %host,
314 "server.port" = %port,
315 "url.full" = %schema_uri,
316 "otel.name" = %otel_name,
317 "otel.original_name" = "http_request",
318 );
319
320 get_text_map_propagator(|propagator| {
321 propagator.inject_context(
322 &prepare_context(http_req_span.context()),
323 &mut crate::otel_compat::HeaderInjector(request.headers_mut()),
324 );
325 });
326
327 let response = client.call(request).instrument(http_req_span).await?;
328 router::body::into_bytes(response.into_body())
329 .await
330 .map_err(BoxError::from)
331 .and_then(|bytes| serde_json::from_slice(&bytes).map_err(BoxError::from))
332 }
333}
334
335pub(crate) fn externalize_header_map(
337 input: &HeaderMap<HeaderValue>,
338) -> Result<HashMap<String, Vec<String>>, BoxError> {
339 let mut output = HashMap::new();
340 for (k, v) in input {
341 let k = k.as_str().to_owned();
342 let v = String::from_utf8(v.as_bytes().to_vec()).map_err(|e| e.to_string())?;
343 output.entry(k).or_insert_with(Vec::new).push(v)
344 }
345 Ok(output)
346}
347
348#[cfg(test)]
349mod test {
350 use http::Response;
351 use tower::service_fn;
352 use tracing_futures::WithSubscriber;
353
354 use super::*;
355 use crate::assert_snapshot_subscriber;
356
357 #[test]
358 fn it_will_build_router_externalizable_correctly() {
359 Externalizable::<String>::router_builder()
360 .stage(PipelineStep::RouterRequest)
361 .id(String::default())
362 .build();
363 Externalizable::<String>::router_builder()
364 .stage(PipelineStep::RouterResponse)
365 .id(String::default())
366 .build();
367 }
368
369 #[test]
370 #[should_panic]
371 fn it_will_not_build_router_externalizable_incorrectly() {
372 Externalizable::<String>::router_builder()
373 .stage(PipelineStep::SubgraphRequest)
374 .id(String::default())
375 .build();
376 Externalizable::<String>::router_builder()
377 .stage(PipelineStep::SubgraphResponse)
378 .id(String::default())
379 .build();
380 }
381
382 #[test]
383 #[should_panic]
384 fn it_will_not_build_router_externalizable_incorrectl_supergraph() {
385 Externalizable::<String>::router_builder()
386 .stage(PipelineStep::SupergraphRequest)
387 .id(String::default())
388 .build();
389 Externalizable::<String>::router_builder()
390 .stage(PipelineStep::SupergraphResponse)
391 .id(String::default())
392 .build();
393 }
394
395 #[test]
396 fn it_will_build_subgraph_externalizable_correctly() {
397 Externalizable::<String>::subgraph_builder()
398 .stage(PipelineStep::SubgraphRequest)
399 .id(String::default())
400 .build();
401 Externalizable::<String>::subgraph_builder()
402 .stage(PipelineStep::SubgraphResponse)
403 .id(String::default())
404 .build();
405 }
406
407 #[test]
408 #[should_panic]
409 fn it_will_not_build_subgraph_externalizable_incorrectly() {
410 Externalizable::<String>::subgraph_builder()
411 .stage(PipelineStep::RouterRequest)
412 .id(String::default())
413 .build();
414 Externalizable::<String>::subgraph_builder()
415 .stage(PipelineStep::RouterResponse)
416 .id(String::default())
417 .build();
418 }
419
420 #[tokio::test]
421 async fn it_will_create_an_http_request_span() {
422 async {
423 let service = service_fn(|_req: http::Request<RouterBody>| async {
425 tracing::info!("got request");
426 Ok::<_, BoxError>(
427 Response::builder()
428 .status(200)
429 .body(router::body::from_bytes(vec![]))
430 .unwrap(),
431 )
432 });
433
434 let externalizable = Externalizable::<String>::router_builder()
436 .stage(PipelineStep::RouterRequest)
437 .id("test-id".to_string())
438 .build();
439
440 let _ = externalizable
442 .call(service, "http://example.com/test")
443 .await;
444 }
445 .with_subscriber(assert_snapshot_subscriber!())
446 .await;
447 }
448}