#![allow(missing_docs)]
use std::collections::HashSet;
use std::fmt::Display;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use apollo_compiler::validation::Valid;
use http::StatusCode;
use http::Version;
use itertools::Itertools;
use multimap::MultiMap;
use serde::Deserialize;
use serde::Serialize;
use serde_json_bytes::ByteString;
use serde_json_bytes::Map as JsonMap;
use serde_json_bytes::Value;
use sha2::Digest;
use sha2::Sha256;
use static_assertions::assert_impl_all;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio_stream::Stream;
use tower::BoxError;
use crate::Context;
use crate::batching::BatchQuery;
use crate::error::Error;
use crate::graphql;
use crate::http_ext::TryIntoHeaderName;
use crate::http_ext::TryIntoHeaderValue;
use crate::http_ext::header_map;
use crate::json_ext::Object;
use crate::json_ext::Path;
use crate::plugins::authentication::APOLLO_AUTHENTICATION_JWT_CLAIMS;
use crate::plugins::authentication::subgraph::SigningParamsConfig;
use crate::plugins::authorization::CacheKeyMetadata;
use crate::plugins::response_cache::cache_control::CacheControl;
use crate::query_planner::fetch::OperationKind;
use crate::spec::QueryHash;
pub type BoxService = tower::util::BoxService<Request, Response, BoxError>;
pub type BoxCloneService = tower::util::BoxCloneService<Request, Response, BoxError>;
pub type ServiceResult = Result<Response, BoxError>;
pub(crate) type BoxGqlStream = Pin<Box<dyn Stream<Item = graphql::Response> + Send + Sync>>;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct SubgraphRequestId(pub String);
impl Display for SubgraphRequestId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
assert_impl_all!(Request: Send);
#[non_exhaustive]
pub struct Request {
pub supergraph_request: Arc<http::Request<graphql::Request>>,
pub subgraph_request: http::Request<graphql::Request>,
pub operation_kind: OperationKind,
pub context: Context,
pub(crate) subgraph_name: String,
pub(crate) subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
pub(crate) connection_closed_signal: Option<broadcast::Receiver<()>>,
pub(crate) query_hash: Arc<QueryHash>,
pub(crate) authorization: Arc<CacheKeyMetadata>,
pub(crate) executable_document: Option<Arc<Valid<apollo_compiler::ExecutableDocument>>>,
pub(crate) id: SubgraphRequestId,
}
#[buildstructor::buildstructor]
impl Request {
#[builder(visibility = "pub")]
fn new(
supergraph_request: Arc<http::Request<graphql::Request>>,
subgraph_request: http::Request<graphql::Request>,
operation_kind: OperationKind,
context: Context,
subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
subgraph_name: String,
connection_closed_signal: Option<broadcast::Receiver<()>>,
executable_document: Option<Arc<Valid<apollo_compiler::ExecutableDocument>>>,
) -> Request {
Self {
supergraph_request,
subgraph_request,
operation_kind,
context,
subgraph_name,
subscription_stream,
connection_closed_signal,
query_hash: QueryHash::default().into(),
authorization: Default::default(),
executable_document,
id: SubgraphRequestId::new(),
}
}
#[builder(visibility = "pub")]
fn fake_new(
supergraph_request: Option<Arc<http::Request<graphql::Request>>>,
subgraph_request: Option<http::Request<graphql::Request>>,
operation_kind: Option<OperationKind>,
context: Option<Context>,
subscription_stream: Option<mpsc::Sender<BoxGqlStream>>,
subgraph_name: Option<String>,
connection_closed_signal: Option<broadcast::Receiver<()>>,
) -> Request {
Request::new(
supergraph_request.unwrap_or_default(),
subgraph_request.unwrap_or_default(),
operation_kind.unwrap_or(OperationKind::Query),
context.unwrap_or_default(),
subscription_stream,
subgraph_name.unwrap_or_default(),
connection_closed_signal,
None,
)
}
pub(crate) fn is_part_of_batch(&self) -> bool {
self.context
.extensions()
.with_lock(|lock| lock.contains_key::<BatchQuery>())
}
pub(crate) fn subgraph_operation_name(&self) -> Option<&str> {
self.subgraph_request.body().operation_name.as_deref()
}
pub(crate) fn root_operation_fields(&self) -> Vec<String> {
self.executable_document
.as_ref()
.and_then(|executable_document| {
let operation_name = self.subgraph_operation_name();
Some(
executable_document
.operations
.get(operation_name)
.ok()?
.root_fields(executable_document)
.map(|f| f.name.to_string())
.collect(),
)
})
.unwrap_or_default()
}
}
impl Clone for Request {
fn clone(&self) -> Self {
let mut builder = http::Request::builder()
.method(self.subgraph_request.method())
.version(self.subgraph_request.version())
.uri(self.subgraph_request.uri());
{
let headers = builder.headers_mut().unwrap();
headers.extend(
self.subgraph_request
.headers()
.iter()
.map(|(name, value)| (name.clone(), value.clone())),
);
}
let mut subgraph_request = builder.body(self.subgraph_request.body().clone()).unwrap();
if let Some(signing_params) = self
.subgraph_request
.extensions()
.get::<Arc<SigningParamsConfig>>()
.cloned()
{
subgraph_request.extensions_mut().insert(signing_params);
}
Self {
supergraph_request: self.supergraph_request.clone(),
subgraph_request,
operation_kind: self.operation_kind,
context: self.context.clone(),
subgraph_name: self.subgraph_name.clone(),
subscription_stream: self.subscription_stream.clone(),
connection_closed_signal: self
.connection_closed_signal
.as_ref()
.map(|s| s.resubscribe()),
query_hash: self.query_hash.clone(),
authorization: self.authorization.clone(),
executable_document: self.executable_document.clone(),
id: self.id.clone(),
}
}
}
impl SubgraphRequestId {
pub fn new() -> Self {
SubgraphRequestId(
uuid::Uuid::new_v4()
.as_hyphenated()
.encode_lower(&mut uuid::Uuid::encode_buffer())
.to_string(),
)
}
}
impl std::ops::Deref for SubgraphRequestId {
type Target = str;
fn deref(&self) -> &str {
&self.0
}
}
impl Default for SubgraphRequestId {
fn default() -> Self {
Self::new()
}
}
assert_impl_all!(Response: Send);
#[derive(Debug)]
#[non_exhaustive]
pub struct Response {
pub response: http::Response<graphql::Response>,
pub(crate) subgraph_name: String,
pub context: Context,
pub(crate) id: SubgraphRequestId,
}
#[buildstructor::buildstructor]
impl Response {
pub(crate) fn new_from_response(
response: http::Response<graphql::Response>,
context: Context,
subgraph_name: String,
id: SubgraphRequestId,
) -> Self {
Self {
response,
context,
subgraph_name,
id,
}
}
#[builder(visibility = "pub")]
fn new(
label: Option<String>,
data: Option<Value>,
path: Option<Path>,
errors: Vec<Error>,
extensions: Object,
status_code: Option<StatusCode>,
context: Context,
headers: Option<http::HeaderMap<http::HeaderValue>>,
subgraph_name: String,
id: Option<SubgraphRequestId>,
) -> Self {
let res = graphql::Response::builder()
.and_label(label)
.data(data.unwrap_or_default())
.and_path(path)
.errors(errors)
.extensions(extensions)
.build();
let mut response = http::Response::builder()
.status(status_code.unwrap_or(StatusCode::OK))
.body(res)
.expect("Response is serializable; qed");
*response.headers_mut() = headers.unwrap_or_default();
let id = id.unwrap_or_default();
Self {
response,
context,
subgraph_name,
id,
}
}
#[builder(visibility = "pub")]
fn fake_new(
label: Option<String>,
data: Option<Value>,
path: Option<Path>,
errors: Vec<Error>,
extensions: JsonMap<ByteString, Value>,
status_code: Option<StatusCode>,
context: Option<Context>,
headers: Option<http::HeaderMap<http::HeaderValue>>,
subgraph_name: Option<String>,
id: Option<SubgraphRequestId>,
) -> Self {
Self::new(
label,
data,
path,
errors,
extensions,
status_code,
context.unwrap_or_default(),
headers,
subgraph_name.unwrap_or_default(),
id,
)
}
#[builder(visibility = "pub")]
fn fake2_new(
label: Option<String>,
data: Option<Value>,
path: Option<Path>,
errors: Vec<Error>,
extensions: JsonMap<ByteString, Value>,
status_code: Option<StatusCode>,
context: Option<Context>,
headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
subgraph_name: Option<String>,
id: Option<SubgraphRequestId>,
) -> Result<Response, BoxError> {
Ok(Self::new(
label,
data,
path,
errors,
extensions,
status_code,
context.unwrap_or_default(),
Some(header_map(headers)?),
subgraph_name.unwrap_or_default(),
id,
))
}
#[builder(visibility = "pub")]
fn error_new(
errors: Vec<Error>,
status_code: Option<StatusCode>,
context: Context,
subgraph_name: String,
id: Option<SubgraphRequestId>,
) -> Self {
Self::new(
Default::default(),
Default::default(),
Default::default(),
errors,
Default::default(),
status_code,
context,
Default::default(),
subgraph_name,
id,
)
}
pub(crate) fn subgraph_cache_control(
&self,
default_ttl: Option<Duration>,
) -> Result<CacheControl, BoxError> {
Ok(CacheControl::try_from(self.response.headers())?.with_default_ttl(default_ttl))
}
pub(crate) fn get_from_extensions(&self, key: &str) -> Option<&Value> {
self.response.body().extensions.get(key)
}
}
impl Request {
pub(crate) fn to_sha256(
&self,
ignored_headers: &HashSet<String>,
ignore_auth_context: bool,
) -> String {
let mut hasher = Sha256::new();
let http_req = &self.subgraph_request;
hasher.update(http_req.method().as_str().as_bytes());
let version = match http_req.version() {
Version::HTTP_09 => "HTTP/0.9",
Version::HTTP_10 => "HTTP/1.0",
Version::HTTP_11 => "HTTP/1.1",
Version::HTTP_2 => "HTTP/2.0",
Version::HTTP_3 => "HTTP/3.0",
_ => "unknown",
};
hasher.update(version.as_bytes());
let uri = http_req.uri();
if let Some(scheme) = uri.scheme() {
hasher.update(scheme.as_str().as_bytes());
}
if let Some(authority) = uri.authority() {
hasher.update(authority.as_str().as_bytes());
}
if let Some(query) = uri.query() {
hasher.update(query.as_bytes());
}
let mut headers: Vec<(&[u8], &[u8])> = Vec::with_capacity(http_req.headers().len());
headers.extend(
http_req
.headers()
.iter()
.filter(|(name, _)| !ignored_headers.contains(name.as_str()))
.map(|(name, value)| (name.as_str().as_bytes(), value.as_bytes())),
);
hasher.update(b"\0H");
sort_and_hash(&mut hasher, headers);
if !ignore_auth_context
&& let Some(claim) = self
.context
.get_json_value(APOLLO_AUTHENTICATION_JWT_CLAIMS)
{
hasher.update(b"\0C");
hasher.update(format!("{claim:?}").as_bytes());
}
let body = http_req.body();
if let Some(operation_name) = &body.operation_name {
hasher.update(b"\0O");
hasher.update(operation_name.as_bytes());
}
if let Some(query) = &body.query {
hasher.update(b"\0Q");
hasher.update(query.as_bytes());
}
hasher.update(b"\0V");
sort_and_hash(
&mut hasher,
body.variables
.iter()
.map(|(k, v)| (k.inner(), v.to_bytes())),
);
hasher.update(b"\0E");
sort_and_hash(
&mut hasher,
body.extensions
.iter()
.map(|(k, v)| (k.inner(), v.to_bytes())),
);
hex::encode(hasher.finalize())
}
}
fn sort_and_hash(
hasher: &mut Sha256,
pairs: impl IntoIterator<Item = (impl AsRef<[u8]>, impl AsRef<[u8]>)>,
) {
let sorted = pairs
.into_iter()
.sorted_unstable_by(|a, b| a.0.as_ref().cmp(b.0.as_ref()));
for (k, v) in sorted {
hasher.update(k.as_ref());
hasher.update([0]);
hasher.update(v.as_ref());
hasher.update([0]);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_subgraph_request_hash() {
let subgraph_req_1 = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.header("public_header", "value")
.header("auth", "my_token")
.body(graphql::Request::default())
.unwrap(),
)
.build();
let subgraph_req_2 = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.header("public_header", "value_bis")
.header("auth", "my_token")
.body(graphql::Request::default())
.unwrap(),
)
.build();
let mut ignored_headers = HashSet::new();
ignored_headers.insert("public_header".to_string());
assert_eq!(
subgraph_req_1.to_sha256(&ignored_headers, false),
subgraph_req_2.to_sha256(&ignored_headers, false)
);
let subgraph_req_1 = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.header("public_header", "value")
.header("auth", "my_token")
.body(graphql::Request::default())
.unwrap(),
)
.build();
let subgraph_req_2 = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.header("public_header", "value_bis")
.header("auth", "my_token")
.body(graphql::Request::default())
.unwrap(),
)
.build();
let ignored_headers = HashSet::new();
assert_ne!(
subgraph_req_1.to_sha256(&ignored_headers, false),
subgraph_req_2.to_sha256(&ignored_headers, false)
);
}
#[test]
fn test_subgraph_request_hash_ignore_auth_context() {
use serde_json_bytes::json;
let req_with_claims_a = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.body(graphql::Request::default())
.unwrap(),
)
.build();
req_with_claims_a
.context
.insert(APOLLO_AUTHENTICATION_JWT_CLAIMS, json!({"sub": "user-a"}))
.expect("insert JWT claims");
let req_with_claims_b = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.body(graphql::Request::default())
.unwrap(),
)
.build();
req_with_claims_b
.context
.insert(APOLLO_AUTHENTICATION_JWT_CLAIMS, json!({"sub": "user-b"}))
.expect("insert JWT claims");
let ignored_headers = HashSet::new();
assert_ne!(
req_with_claims_a.to_sha256(&ignored_headers, false),
req_with_claims_b.to_sha256(&ignored_headers, false),
"requests with different JWT claims must hash differently by default"
);
assert_eq!(
req_with_claims_a.to_sha256(&ignored_headers, true),
req_with_claims_b.to_sha256(&ignored_headers, true),
"requests with different JWT claims must hash identically when ignore_auth_context is true"
);
}
#[test]
fn test_clone_does_not_copy_arbitrary_subgraph_request_extensions() {
#[derive(Clone, PartialEq, Debug)]
struct ShouldNotSurviveClone(u32);
let mut req = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.body(graphql::Request::default())
.unwrap(),
)
.build();
req.subgraph_request
.extensions_mut()
.insert(ShouldNotSurviveClone(42));
let cloned = req.clone();
assert!(
cloned
.subgraph_request
.extensions()
.get::<ShouldNotSurviveClone>()
.is_none(),
"arbitrary extension types must not be copied when SubgraphRequest is cloned"
);
}
#[test]
fn test_subgraph_request_hash_no_delimiter_collision() {
let req_two_headers = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.header("x", "y")
.header("xy", "")
.body(graphql::Request::default())
.unwrap(),
)
.build();
let req_one_header = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.header("x", "yxy")
.body(graphql::Request::default())
.unwrap(),
)
.build();
let ignored_headers = HashSet::new();
assert_ne!(
req_two_headers.to_sha256(&ignored_headers, false),
req_one_header.to_sha256(&ignored_headers, false),
"header pairs must be delimited so concatenations cannot collide"
);
}
#[test]
fn test_subgraph_request_hash_non_ascii_value_distinguishable() {
let req_a = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.header(
"x-custom",
http::HeaderValue::from_bytes(&[0xC3, 0xA9]).unwrap(),
)
.body(graphql::Request::default())
.unwrap(),
)
.build();
let req_b = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.header(
"x-custom",
http::HeaderValue::from_bytes(&[0xC3, 0xB1]).unwrap(),
)
.body(graphql::Request::default())
.unwrap(),
)
.build();
let ignored_headers = HashSet::new();
assert_ne!(
req_a.to_sha256(&ignored_headers, false),
req_b.to_sha256(&ignored_headers, false),
"non-ASCII header values must not be collapsed to a single sentinel"
);
}
#[test]
fn test_subgraph_request_hash_variables_order_independence() {
use serde_json_bytes::json;
let mut vars_a = JsonMap::new();
vars_a.insert("a", json!(1));
vars_a.insert("b", json!(2));
vars_a.insert("c", json!(3));
let mut vars_b = JsonMap::new();
vars_b.insert("c", json!(3));
vars_b.insert("a", json!(1));
vars_b.insert("b", json!(2));
let req_a = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.body(graphql::Request::builder().variables(vars_a).build())
.unwrap(),
)
.build();
let req_b = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.body(graphql::Request::builder().variables(vars_b).build())
.unwrap(),
)
.build();
let ignored_headers = HashSet::new();
assert_eq!(
req_a.to_sha256(&ignored_headers, false),
req_b.to_sha256(&ignored_headers, false),
"two requests with the same variables in different insertion orders must hash identically"
);
}
#[test]
fn test_subgraph_request_hash_variables_no_delimiter_collision() {
use serde_json_bytes::json;
let mut vars_two = JsonMap::new();
vars_two.insert("key", json!(1));
vars_two.insert("value2", json!(null));
let mut vars_one = JsonMap::new();
vars_one.insert("key1value2", json!(null));
let req_two = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.body(graphql::Request::builder().variables(vars_two).build())
.unwrap(),
)
.build();
let req_one = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.body(graphql::Request::builder().variables(vars_one).build())
.unwrap(),
)
.build();
let ignored_headers = HashSet::new();
assert_ne!(
req_two.to_sha256(&ignored_headers, false),
req_one.to_sha256(&ignored_headers, false),
"variable pairs must be delimited so concatenations cannot collide"
);
}
#[test]
fn test_subgraph_request_hash_no_cross_section_collision_variables_vs_extensions() {
use serde_json_bytes::json;
let mut vars = JsonMap::new();
vars.insert("k", json!(1));
let mut exts = JsonMap::new();
exts.insert("k", json!(1));
let req_vars_only = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.body(graphql::Request::builder().variables(vars).build())
.unwrap(),
)
.build();
let req_exts_only = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.body(graphql::Request::builder().extensions(exts).build())
.unwrap(),
)
.build();
let ignored_headers = HashSet::new();
assert_ne!(
req_vars_only.to_sha256(&ignored_headers, false),
req_exts_only.to_sha256(&ignored_headers, false),
"the variables and extensions sections must be domain-separated so identical \
entries in different sections cannot collide"
);
}
#[test]
fn test_subgraph_request_hash_no_cross_section_collision_query_vs_operation_name() {
let req_a = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.body(
graphql::Request::builder()
.operation_name("AB")
.query("CD")
.build(),
)
.unwrap(),
)
.build();
let req_b = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.body(
graphql::Request::builder()
.operation_name("ABC")
.query("D")
.build(),
)
.unwrap(),
)
.build();
let ignored_headers = HashSet::new();
assert_ne!(
req_a.to_sha256(&ignored_headers, false),
req_b.to_sha256(&ignored_headers, false),
"operation_name and query must be domain-separated so concatenations cannot collide"
);
}
#[test]
fn test_subgraph_request_hash_extensions_order_independence() {
use serde_json_bytes::json;
let mut ext_a = JsonMap::new();
ext_a.insert("alpha", json!("x"));
ext_a.insert("beta", json!("y"));
let mut ext_b = JsonMap::new();
ext_b.insert("beta", json!("y"));
ext_b.insert("alpha", json!("x"));
let req_a = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.body(graphql::Request::builder().extensions(ext_a).build())
.unwrap(),
)
.build();
let req_b = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.body(graphql::Request::builder().extensions(ext_b).build())
.unwrap(),
)
.build();
let ignored_headers = HashSet::new();
assert_eq!(
req_a.to_sha256(&ignored_headers, false),
req_b.to_sha256(&ignored_headers, false),
"two requests with the same extensions in different insertion orders must hash identically"
);
}
#[test]
fn test_subgraph_request_hash_header_order_independence() {
let req_a = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.header("x-a", "1")
.header("x-b", "2")
.header("x-c", "3")
.body(graphql::Request::default())
.unwrap(),
)
.build();
let req_b = Request::fake_builder()
.subgraph_request(
http::Request::builder()
.header("x-c", "3")
.header("x-a", "1")
.header("x-b", "2")
.body(graphql::Request::default())
.unwrap(),
)
.build();
let ignored_headers = HashSet::new();
assert_eq!(
req_a.to_sha256(&ignored_headers, false),
req_b.to_sha256(&ignored_headers, false),
"two requests with the same headers in different insertion orders must hash identically"
);
}
}