use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use http::HeaderMap;
use http::HeaderValue;
use http::Method;
use http::StatusCode;
use http::header::ACCEPT;
use http::header::CONTENT_TYPE;
#[cfg(unix)]
use hyperlocal;
use opentelemetry::global::get_text_map_propagator;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
use serde::de::DeserializeOwned;
use strum::Display;
use tower::BoxError;
use tower::Service;
use tracing::Instrument;
use super::subgraph::SubgraphRequestId;
use crate::Context;
use crate::plugins::telemetry::consts::HTTP_REQUEST_SPAN_NAME;
use crate::plugins::telemetry::otel::OpenTelemetrySpanExt;
use crate::plugins::telemetry::reload::otel::prepare_context;
use crate::query_planner::QueryPlan;
use crate::services::http::HttpRequest;
use crate::services::http::HttpResponse;
#[cfg(unix)]
use crate::services::parse_unix_socket_url;
use crate::services::router;
pub(crate) const DEFAULT_EXTERNALIZATION_TIMEOUT: Duration = Duration::from_secs(1);
pub(crate) const EXTERNALIZABLE_VERSION: u8 = 1;
#[derive(Clone, Debug, Display, Deserialize, PartialEq, Serialize, JsonSchema)]
pub(crate) enum PipelineStep {
RouterRequest,
RouterResponse,
SupergraphRequest,
SupergraphResponse,
ExecutionRequest,
ExecutionResponse,
SubgraphRequest,
SubgraphResponse,
ConnectorRequest,
ConnectorResponse,
}
impl From<PipelineStep> for opentelemetry::Value {
fn from(val: PipelineStep) -> Self {
val.to_string().into()
}
}
#[derive(Clone, Debug, Default, Display, Deserialize, PartialEq, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub(crate) enum Control {
#[default]
Continue,
Break(u16),
}
impl Control {
#[allow(dead_code)]
fn new(status: u16) -> Self {
Control::Break(status)
}
pub(crate) fn get_http_status(&self) -> Result<StatusCode, BoxError> {
match self {
Control::Continue => Ok(StatusCode::OK),
Control::Break(code) => StatusCode::from_u16(*code).map_err(|e| e.into()),
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct Externalizable<T> {
pub(crate) version: u8,
pub(crate) stage: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) control: Option<Control>,
pub(crate) id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) headers: Option<HashMap<String, Vec<String>>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) body: Option<T>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) context: Option<Context>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) sdl: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) uri: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) method: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) path: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) service_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) status_code: Option<u16>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) has_next: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
query_plan: Option<Arc<QueryPlan>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) subgraph_request_id: Option<SubgraphRequestId>,
}
#[buildstructor::buildstructor]
impl<T> Externalizable<T>
where
T: Debug + DeserializeOwned + Serialize + Send + Sync,
{
#[builder(visibility = "pub(crate)")]
fn router_new(
stage: PipelineStep,
control: Option<Control>,
id: String,
headers: Option<HashMap<String, Vec<String>>>,
body: Option<T>,
context: Option<Context>,
status_code: Option<u16>,
method: Option<String>,
path: Option<String>,
sdl: Option<String>,
) -> Self {
assert!(matches!(
stage,
PipelineStep::RouterRequest | PipelineStep::RouterResponse
));
Externalizable {
version: EXTERNALIZABLE_VERSION,
stage: stage.to_string(),
control,
id: Some(id),
headers,
body,
context,
status_code,
sdl,
uri: None,
path,
method,
service_name: None,
has_next: None,
query_plan: None,
subgraph_request_id: None,
}
}
#[builder(visibility = "pub(crate)")]
fn supergraph_new(
stage: PipelineStep,
control: Option<Control>,
id: String,
headers: Option<HashMap<String, Vec<String>>>,
body: Option<T>,
context: Option<Context>,
status_code: Option<u16>,
method: Option<String>,
sdl: Option<String>,
has_next: Option<bool>,
) -> Self {
assert!(matches!(
stage,
PipelineStep::SupergraphRequest | PipelineStep::SupergraphResponse
));
Externalizable {
version: EXTERNALIZABLE_VERSION,
stage: stage.to_string(),
control,
id: Some(id),
headers,
body,
context,
status_code,
sdl,
uri: None,
path: None,
method,
service_name: None,
has_next,
query_plan: None,
subgraph_request_id: None,
}
}
#[builder(visibility = "pub(crate)")]
fn execution_new(
stage: PipelineStep,
control: Option<Control>,
id: String,
headers: Option<HashMap<String, Vec<String>>>,
body: Option<T>,
context: Option<Context>,
status_code: Option<u16>,
method: Option<String>,
sdl: Option<String>,
has_next: Option<bool>,
query_plan: Option<Arc<QueryPlan>>,
) -> Self {
assert!(matches!(
stage,
PipelineStep::ExecutionRequest | PipelineStep::ExecutionResponse
));
Externalizable {
version: EXTERNALIZABLE_VERSION,
stage: stage.to_string(),
control,
id: Some(id),
headers,
body,
context,
status_code,
sdl,
uri: None,
path: None,
method,
service_name: None,
has_next,
query_plan,
subgraph_request_id: None,
}
}
#[builder(visibility = "pub(crate)")]
fn subgraph_new(
stage: PipelineStep,
control: Option<Control>,
id: String,
headers: Option<HashMap<String, Vec<String>>>,
body: Option<T>,
context: Option<Context>,
status_code: Option<u16>,
method: Option<String>,
service_name: Option<String>,
uri: Option<String>,
subgraph_request_id: Option<SubgraphRequestId>,
) -> Self {
assert!(matches!(
stage,
PipelineStep::SubgraphRequest | PipelineStep::SubgraphResponse
));
Externalizable {
version: EXTERNALIZABLE_VERSION,
stage: stage.to_string(),
control,
id: Some(id),
headers,
body,
context,
status_code,
sdl: None,
uri,
path: None,
method,
service_name,
has_next: None,
query_plan: None,
subgraph_request_id,
}
}
pub(crate) async fn call<C>(
self,
mut client: C,
uri: &str,
context: Context,
) -> Result<Self, BoxError>
where
C: Service<HttpRequest, Response = HttpResponse, Error = BoxError>
+ Clone
+ Send
+ Sync
+ 'static,
{
tracing::debug!("forwarding json: {}", serde_json::to_string(&self)?);
#[cfg(unix)]
let converted_uri: http::Uri = if let Some(path) = uri.strip_prefix("unix://") {
let (socket_path, http_path) = parse_unix_socket_url(path);
hyperlocal::Uri::new(socket_path, http_path).into()
} else {
uri.parse()?
};
#[cfg(not(unix))]
let converted_uri: http::Uri = uri.parse()?;
let mut http_request = http::Request::builder()
.uri(converted_uri)
.method(Method::POST)
.header(ACCEPT, "application/json")
.header(CONTENT_TYPE, "application/json")
.body(router::body::from_bytes(serde_json::to_vec(&self)?))?;
let schema_uri = http_request.uri();
let host = schema_uri.host().unwrap_or_default();
let port = schema_uri.port_u16().unwrap_or_else(|| {
let scheme = schema_uri.scheme_str();
if scheme == Some("https") {
443
} else if scheme == Some("http") {
80
} else {
0
}
});
let otel_name = format!("POST {uri}");
let http_req_span = tracing::info_span!(HTTP_REQUEST_SPAN_NAME,
"otel.kind" = "CLIENT",
"http.request.method" = "POST",
"server.address" = %host,
"server.port" = %port,
"url.full" = %uri,
"otel.name" = %otel_name,
"otel.original_name" = "http_request",
);
get_text_map_propagator(|propagator| {
propagator.inject_context(
&prepare_context(http_req_span.context()),
&mut crate::otel_compat::HeaderInjector(http_request.headers_mut()),
);
});
let request = HttpRequest {
http_request,
context,
};
let response = client.call(request).instrument(http_req_span).await?;
router::body::into_bytes(response.http_response.into_body())
.await
.map_err(BoxError::from)
.and_then(|bytes| serde_json::from_slice(&bytes).map_err(BoxError::from))
}
#[builder(visibility = "pub(crate)")]
fn connector_new(
stage: PipelineStep,
control: Option<Control>,
id: String,
headers: Option<HashMap<String, Vec<String>>>,
body: Option<T>,
context: Option<Context>,
status_code: Option<u16>,
method: Option<String>,
service_name: Option<String>,
uri: Option<String>,
) -> Self {
assert!(matches!(
stage,
PipelineStep::ConnectorRequest | PipelineStep::ConnectorResponse
));
Externalizable {
version: EXTERNALIZABLE_VERSION,
stage: stage.to_string(),
control,
id: Some(id),
headers,
body,
context,
status_code,
sdl: None,
uri,
path: None,
method,
service_name,
has_next: None,
query_plan: None,
subgraph_request_id: None,
}
}
}
pub(crate) fn externalize_header_map(
input: &HeaderMap<HeaderValue>,
) -> HashMap<String, Vec<String>> {
let mut output = HashMap::with_capacity(input.keys_len());
for (k, v) in input {
let k = k.as_str().to_owned();
match String::from_utf8(v.as_bytes().to_vec()) {
Ok(v) => output.entry(k).or_insert_with(Vec::new).push(v),
Err(e) => tracing::warn!(
"unable to convert header value to utf-8 for {}, will not be sent to coprocessor: {}",
k,
e
),
}
}
output
}
#[cfg(test)]
mod test {
use http::Response;
use tower::service_fn;
use tracing_futures::WithSubscriber;
use super::*;
use crate::assert_snapshot_subscriber;
use crate::test_harness::tracing_test;
#[test]
fn it_will_build_router_externalizable_correctly() {
Externalizable::<String>::router_builder()
.stage(PipelineStep::RouterRequest)
.id(String::default())
.build();
Externalizable::<String>::router_builder()
.stage(PipelineStep::RouterResponse)
.id(String::default())
.build();
}
#[test]
#[should_panic]
fn it_will_not_build_router_externalizable_incorrectly() {
Externalizable::<String>::router_builder()
.stage(PipelineStep::SubgraphRequest)
.id(String::default())
.build();
Externalizable::<String>::router_builder()
.stage(PipelineStep::SubgraphResponse)
.id(String::default())
.build();
}
#[test]
#[should_panic]
fn it_will_not_build_router_externalizable_incorrectl_supergraph() {
Externalizable::<String>::router_builder()
.stage(PipelineStep::SupergraphRequest)
.id(String::default())
.build();
Externalizable::<String>::router_builder()
.stage(PipelineStep::SupergraphResponse)
.id(String::default())
.build();
}
#[test]
fn it_will_build_subgraph_externalizable_correctly() {
Externalizable::<String>::subgraph_builder()
.stage(PipelineStep::SubgraphRequest)
.id(String::default())
.build();
Externalizable::<String>::subgraph_builder()
.stage(PipelineStep::SubgraphResponse)
.id(String::default())
.build();
}
#[test]
#[should_panic]
fn it_will_not_build_subgraph_externalizable_incorrectly() {
Externalizable::<String>::subgraph_builder()
.stage(PipelineStep::RouterRequest)
.id(String::default())
.build();
Externalizable::<String>::subgraph_builder()
.stage(PipelineStep::RouterResponse)
.id(String::default())
.build();
}
#[test]
fn it_will_externalize_headers_correctly() {
let _guard = tracing_test::dispatcher_guard();
let mut headers = HeaderMap::new();
headers.insert("content-type", HeaderValue::from_static("application/json"));
headers.insert("x-test-header", unsafe {
HeaderValue::from_maybe_shared_unchecked(b"invalid\xc0\xaf")
});
let externalized = externalize_header_map(&headers);
assert_eq!(
externalized,
HashMap::from([(
"content-type".to_string(),
vec!["application/json".to_string()]
)])
);
assert!(tracing_test::logs_contain(
"unable to convert header value to utf-8 for x-test-header, will not be sent to coprocessor: invalid utf-8 sequence of 1 bytes from index 7"
));
}
#[tokio::test]
async fn it_will_create_an_http_request_span() {
use crate::services::http::HttpRequest;
use crate::services::http::HttpResponse;
async {
let service = service_fn(|req: HttpRequest| async move {
tracing::info!("got request");
Ok::<_, BoxError>(HttpResponse {
http_response: Response::builder()
.status(200)
.body(router::body::from_bytes(
serde_json::to_vec(&serde_json::json!({
"version": EXTERNALIZABLE_VERSION,
"stage": "RouterRequest",
"control": "continue",
"id": "test-id",
}))
.unwrap(),
))
.unwrap(),
context: req.context,
})
});
let externalizable = Externalizable::<String>::router_builder()
.stage(PipelineStep::RouterRequest)
.id("test-id".to_string())
.build();
let _ = externalizable
.call(service, "http://example.com/test", Context::new())
.await;
}
.with_subscriber(assert_snapshot_subscriber!())
.await;
}
}