1use std::collections::HashMap;
5use std::fmt::Debug;
6use std::sync::Arc;
7use std::time::Duration;
8
9use http::HeaderMap;
10use http::HeaderValue;
11use http::Method;
12use http::StatusCode;
13use http::header::ACCEPT;
14use http::header::CONTENT_TYPE;
15#[cfg(unix)]
16use hyperlocal;
17use opentelemetry::global::get_text_map_propagator;
18use schemars::JsonSchema;
19use serde::Deserialize;
20use serde::Serialize;
21use serde::de::DeserializeOwned;
22use strum::Display;
23use tower::BoxError;
24use tower::Service;
25use tracing::Instrument;
26
27use super::subgraph::SubgraphRequestId;
28use crate::Context;
29use crate::plugins::telemetry::consts::HTTP_REQUEST_SPAN_NAME;
30use crate::plugins::telemetry::otel::OpenTelemetrySpanExt;
31use crate::plugins::telemetry::reload::otel::prepare_context;
32use crate::query_planner::QueryPlan;
33use crate::services::http::HttpRequest;
34use crate::services::http::HttpResponse;
35#[cfg(unix)]
36use crate::services::parse_unix_socket_url;
37use crate::services::router;
38
39pub(crate) const DEFAULT_EXTERNALIZATION_TIMEOUT: Duration = Duration::from_secs(1);
40
41pub(crate) const EXTERNALIZABLE_VERSION: u8 = 1;
43
44#[derive(Clone, Debug, Display, Deserialize, PartialEq, Serialize, JsonSchema)]
45pub(crate) enum PipelineStep {
46 RouterRequest,
47 RouterResponse,
48 SupergraphRequest,
49 SupergraphResponse,
50 ExecutionRequest,
51 ExecutionResponse,
52 SubgraphRequest,
53 SubgraphResponse,
54 ConnectorRequest,
55 ConnectorResponse,
56}
57
58impl From<PipelineStep> for opentelemetry::Value {
59 fn from(val: PipelineStep) -> Self {
60 val.to_string().into()
61 }
62}
63
64#[derive(Clone, Debug, Default, Display, Deserialize, PartialEq, Serialize, JsonSchema)]
65#[serde(rename_all = "camelCase")]
66pub(crate) enum Control {
67 #[default]
68 Continue,
69 Break(u16),
70}
71
72impl Control {
73 #[allow(dead_code)]
74 fn new(status: u16) -> Self {
75 Control::Break(status)
76 }
77
78 pub(crate) fn get_http_status(&self) -> Result<StatusCode, BoxError> {
79 match self {
80 Control::Continue => Ok(StatusCode::OK),
81 Control::Break(code) => StatusCode::from_u16(*code).map_err(|e| e.into()),
82 }
83 }
84}
85
86#[derive(Clone, Debug, Deserialize, Serialize)]
87#[serde(rename_all = "camelCase")]
88pub(crate) struct Externalizable<T> {
89 pub(crate) version: u8,
90 pub(crate) stage: String,
91 #[serde(skip_serializing_if = "Option::is_none")]
92 pub(crate) control: Option<Control>,
93 pub(crate) id: Option<String>,
94 #[serde(skip_serializing_if = "Option::is_none")]
95 pub(crate) headers: Option<HashMap<String, Vec<String>>>,
96 #[serde(skip_serializing_if = "Option::is_none")]
97 pub(crate) body: Option<T>,
98 #[serde(skip_serializing_if = "Option::is_none")]
99 pub(crate) context: Option<Context>,
100 #[serde(skip_serializing_if = "Option::is_none")]
101 pub(crate) sdl: Option<String>,
102 #[serde(skip_serializing_if = "Option::is_none")]
103 pub(crate) uri: Option<String>,
104 #[serde(skip_serializing_if = "Option::is_none")]
105 pub(crate) method: Option<String>,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 pub(crate) path: Option<String>,
108 #[serde(skip_serializing_if = "Option::is_none")]
109 pub(crate) service_name: Option<String>,
110 #[serde(skip_serializing_if = "Option::is_none")]
111 pub(crate) status_code: Option<u16>,
112 #[serde(skip_serializing_if = "Option::is_none")]
113 pub(crate) has_next: Option<bool>,
114 #[serde(skip_serializing_if = "Option::is_none")]
115 query_plan: Option<Arc<QueryPlan>>,
116 #[serde(skip_serializing_if = "Option::is_none")]
117 pub(crate) subgraph_request_id: Option<SubgraphRequestId>,
118}
119
120#[buildstructor::buildstructor]
121impl<T> Externalizable<T>
122where
123 T: Debug + DeserializeOwned + Serialize + Send + Sync,
124{
125 #[builder(visibility = "pub(crate)")]
128 fn router_new(
129 stage: PipelineStep,
130 control: Option<Control>,
131 id: String,
132 headers: Option<HashMap<String, Vec<String>>>,
133 body: Option<T>,
134 context: Option<Context>,
135 status_code: Option<u16>,
136 method: Option<String>,
137 path: Option<String>,
138 sdl: Option<String>,
139 ) -> Self {
140 assert!(matches!(
141 stage,
142 PipelineStep::RouterRequest | PipelineStep::RouterResponse
143 ));
144 Externalizable {
145 version: EXTERNALIZABLE_VERSION,
146 stage: stage.to_string(),
147 control,
148 id: Some(id),
149 headers,
150 body,
151 context,
152 status_code,
153 sdl,
154 uri: None,
155 path,
156 method,
157 service_name: None,
158 has_next: None,
159 query_plan: None,
160 subgraph_request_id: None,
161 }
162 }
163
164 #[builder(visibility = "pub(crate)")]
167 fn supergraph_new(
168 stage: PipelineStep,
169 control: Option<Control>,
170 id: String,
171 headers: Option<HashMap<String, Vec<String>>>,
172 body: Option<T>,
173 context: Option<Context>,
174 status_code: Option<u16>,
175 method: Option<String>,
176 sdl: Option<String>,
177 has_next: Option<bool>,
178 ) -> Self {
179 assert!(matches!(
180 stage,
181 PipelineStep::SupergraphRequest | PipelineStep::SupergraphResponse
182 ));
183 Externalizable {
184 version: EXTERNALIZABLE_VERSION,
185 stage: stage.to_string(),
186 control,
187 id: Some(id),
188 headers,
189 body,
190 context,
191 status_code,
192 sdl,
193 uri: None,
194 path: None,
195 method,
196 service_name: None,
197 has_next,
198 query_plan: None,
199 subgraph_request_id: None,
200 }
201 }
202
203 #[builder(visibility = "pub(crate)")]
206 fn execution_new(
207 stage: PipelineStep,
208 control: Option<Control>,
209 id: String,
210 headers: Option<HashMap<String, Vec<String>>>,
211 body: Option<T>,
212 context: Option<Context>,
213 status_code: Option<u16>,
214 method: Option<String>,
215 sdl: Option<String>,
216 has_next: Option<bool>,
217 query_plan: Option<Arc<QueryPlan>>,
218 ) -> Self {
219 assert!(matches!(
220 stage,
221 PipelineStep::ExecutionRequest | PipelineStep::ExecutionResponse
222 ));
223 Externalizable {
224 version: EXTERNALIZABLE_VERSION,
225 stage: stage.to_string(),
226 control,
227 id: Some(id),
228 headers,
229 body,
230 context,
231 status_code,
232 sdl,
233 uri: None,
234 path: None,
235 method,
236 service_name: None,
237 has_next,
238 query_plan,
239 subgraph_request_id: None,
240 }
241 }
242
243 #[builder(visibility = "pub(crate)")]
246 fn subgraph_new(
247 stage: PipelineStep,
248 control: Option<Control>,
249 id: String,
250 headers: Option<HashMap<String, Vec<String>>>,
251 body: Option<T>,
252 context: Option<Context>,
253 status_code: Option<u16>,
254 method: Option<String>,
255 service_name: Option<String>,
256 uri: Option<String>,
257 subgraph_request_id: Option<SubgraphRequestId>,
258 ) -> Self {
259 assert!(matches!(
260 stage,
261 PipelineStep::SubgraphRequest | PipelineStep::SubgraphResponse
262 ));
263 Externalizable {
264 version: EXTERNALIZABLE_VERSION,
265 stage: stage.to_string(),
266 control,
267 id: Some(id),
268 headers,
269 body,
270 context,
271 status_code,
272 sdl: None,
273 uri,
274 path: None,
275 method,
276 service_name,
277 has_next: None,
278 query_plan: None,
279 subgraph_request_id,
280 }
281 }
282
283 pub(crate) async fn call<C>(
284 self,
285 mut client: C,
286 uri: &str,
287 context: Context,
288 ) -> Result<Self, BoxError>
289 where
290 C: Service<HttpRequest, Response = HttpResponse, Error = BoxError>
291 + Clone
292 + Send
293 + Sync
294 + 'static,
295 {
296 tracing::debug!("forwarding json: {}", serde_json::to_string(&self)?);
297
298 #[cfg(unix)]
302 let converted_uri: http::Uri = if let Some(path) = uri.strip_prefix("unix://") {
303 let (socket_path, http_path) = parse_unix_socket_url(path);
304 hyperlocal::Uri::new(socket_path, http_path).into()
305 } else {
306 uri.parse()?
307 };
308
309 #[cfg(not(unix))]
310 let converted_uri: http::Uri = uri.parse()?;
311
312 let mut http_request = http::Request::builder()
313 .uri(converted_uri)
314 .method(Method::POST)
315 .header(ACCEPT, "application/json")
316 .header(CONTENT_TYPE, "application/json")
317 .body(router::body::from_bytes(serde_json::to_vec(&self)?))?;
318
319 let schema_uri = http_request.uri();
320 let host = schema_uri.host().unwrap_or_default();
321 let port = schema_uri.port_u16().unwrap_or_else(|| {
322 let scheme = schema_uri.scheme_str();
323 if scheme == Some("https") {
324 443
325 } else if scheme == Some("http") {
326 80
327 } else {
328 0
329 }
330 });
331
332 let otel_name = format!("POST {uri}");
333 let http_req_span = tracing::info_span!(HTTP_REQUEST_SPAN_NAME,
334 "otel.kind" = "CLIENT",
335 "http.request.method" = "POST",
336 "server.address" = %host,
337 "server.port" = %port,
338 "url.full" = %uri,
339 "otel.name" = %otel_name,
340 "otel.original_name" = "http_request",
341 );
342
343 get_text_map_propagator(|propagator| {
344 propagator.inject_context(
345 &prepare_context(http_req_span.context()),
346 &mut crate::otel_compat::HeaderInjector(http_request.headers_mut()),
347 );
348 });
349
350 let request = HttpRequest {
351 http_request,
352 context,
353 };
354
355 let response = client.call(request).instrument(http_req_span).await?;
356 router::body::into_bytes(response.http_response.into_body())
357 .await
358 .map_err(BoxError::from)
359 .and_then(|bytes| serde_json::from_slice(&bytes).map_err(BoxError::from))
360 }
361
362 #[builder(visibility = "pub(crate)")]
365 fn connector_new(
366 stage: PipelineStep,
367 control: Option<Control>,
368 id: String,
369 headers: Option<HashMap<String, Vec<String>>>,
370 body: Option<T>,
371 context: Option<Context>,
372 status_code: Option<u16>,
373 method: Option<String>,
374 service_name: Option<String>,
375 uri: Option<String>,
376 ) -> Self {
377 assert!(matches!(
378 stage,
379 PipelineStep::ConnectorRequest | PipelineStep::ConnectorResponse
380 ));
381 Externalizable {
382 version: EXTERNALIZABLE_VERSION,
383 stage: stage.to_string(),
384 control,
385 id: Some(id),
386 headers,
387 body,
388 context,
389 status_code,
390 sdl: None,
391 uri,
392 path: None,
393 method,
394 service_name,
395 has_next: None,
396 query_plan: None,
397 subgraph_request_id: None,
398 }
399 }
400}
401
402pub(crate) fn externalize_header_map(
404 input: &HeaderMap<HeaderValue>,
405) -> HashMap<String, Vec<String>> {
406 let mut output = HashMap::with_capacity(input.keys_len());
407 for (k, v) in input {
408 let k = k.as_str().to_owned();
409 match String::from_utf8(v.as_bytes().to_vec()) {
410 Ok(v) => output.entry(k).or_insert_with(Vec::new).push(v),
411 Err(e) => tracing::warn!(
412 "unable to convert header value to utf-8 for {}, will not be sent to coprocessor: {}",
413 k,
414 e
415 ),
416 }
417 }
418 output
419}
420
421#[cfg(test)]
422mod test {
423 use http::Response;
424 use tower::service_fn;
425 use tracing_futures::WithSubscriber;
426
427 use super::*;
428 use crate::assert_snapshot_subscriber;
429 use crate::test_harness::tracing_test;
430
431 #[test]
432 fn it_will_build_router_externalizable_correctly() {
433 Externalizable::<String>::router_builder()
434 .stage(PipelineStep::RouterRequest)
435 .id(String::default())
436 .build();
437 Externalizable::<String>::router_builder()
438 .stage(PipelineStep::RouterResponse)
439 .id(String::default())
440 .build();
441 }
442
443 #[test]
444 #[should_panic]
445 fn it_will_not_build_router_externalizable_incorrectly() {
446 Externalizable::<String>::router_builder()
447 .stage(PipelineStep::SubgraphRequest)
448 .id(String::default())
449 .build();
450 Externalizable::<String>::router_builder()
451 .stage(PipelineStep::SubgraphResponse)
452 .id(String::default())
453 .build();
454 }
455
456 #[test]
457 #[should_panic]
458 fn it_will_not_build_router_externalizable_incorrectl_supergraph() {
459 Externalizable::<String>::router_builder()
460 .stage(PipelineStep::SupergraphRequest)
461 .id(String::default())
462 .build();
463 Externalizable::<String>::router_builder()
464 .stage(PipelineStep::SupergraphResponse)
465 .id(String::default())
466 .build();
467 }
468
469 #[test]
470 fn it_will_build_subgraph_externalizable_correctly() {
471 Externalizable::<String>::subgraph_builder()
472 .stage(PipelineStep::SubgraphRequest)
473 .id(String::default())
474 .build();
475 Externalizable::<String>::subgraph_builder()
476 .stage(PipelineStep::SubgraphResponse)
477 .id(String::default())
478 .build();
479 }
480
481 #[test]
482 #[should_panic]
483 fn it_will_not_build_subgraph_externalizable_incorrectly() {
484 Externalizable::<String>::subgraph_builder()
485 .stage(PipelineStep::RouterRequest)
486 .id(String::default())
487 .build();
488 Externalizable::<String>::subgraph_builder()
489 .stage(PipelineStep::RouterResponse)
490 .id(String::default())
491 .build();
492 }
493
494 #[test]
495 fn it_will_externalize_headers_correctly() {
496 let _guard = tracing_test::dispatcher_guard();
497
498 let mut headers = HeaderMap::new();
499 headers.insert("content-type", HeaderValue::from_static("application/json"));
500 headers.insert("x-test-header", unsafe {
502 HeaderValue::from_maybe_shared_unchecked(b"invalid\xc0\xaf")
503 });
504
505 let externalized = externalize_header_map(&headers);
506
507 assert_eq!(
509 externalized,
510 HashMap::from([(
511 "content-type".to_string(),
512 vec!["application/json".to_string()]
513 )])
514 );
515
516 assert!(tracing_test::logs_contain(
517 "unable to convert header value to utf-8 for x-test-header, will not be sent to coprocessor: invalid utf-8 sequence of 1 bytes from index 7"
518 ));
519 }
520
521 #[tokio::test]
522 async fn it_will_create_an_http_request_span() {
523 use crate::services::http::HttpRequest;
524 use crate::services::http::HttpResponse;
525
526 async {
527 let service = service_fn(|req: HttpRequest| async move {
530 tracing::info!("got request");
531 Ok::<_, BoxError>(HttpResponse {
532 http_response: Response::builder()
533 .status(200)
534 .body(router::body::from_bytes(
535 serde_json::to_vec(&serde_json::json!({
536 "version": EXTERNALIZABLE_VERSION,
537 "stage": "RouterRequest",
538 "control": "continue",
539 "id": "test-id",
540 }))
541 .unwrap(),
542 ))
543 .unwrap(),
544 context: req.context,
545 })
546 });
547
548 let externalizable = Externalizable::<String>::router_builder()
549 .stage(PipelineStep::RouterRequest)
550 .id("test-id".to_string())
551 .build();
552
553 let _ = externalizable
555 .call(service, "http://example.com/test", Context::new())
556 .await;
557 }
558 .with_subscriber(assert_snapshot_subscriber!())
559 .await;
560 }
561}