mod layer;
mod limited;
use std::error::Error;
use async_trait::async_trait;
use bytesize::ByteSize;
use http::StatusCode;
pub(crate) use layer::BodyLimitControl;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
use tower::BoxError;
use tower::ServiceBuilder;
use tower::ServiceExt;
use crate::Context;
use crate::configuration::connector::ConnectorConfiguration;
use crate::configuration::subgraph::SubgraphConfiguration;
use crate::graphql;
use crate::layers::ServiceBuilderExt;
use crate::plugin::PluginInit;
use crate::plugin::PluginPrivate;
use crate::plugins::limits::layer::BodyLimitError;
use crate::plugins::limits::layer::RequestBodyLimitLayer;
use crate::services::SubgraphRequest;
use crate::services::connector;
use crate::services::router;
use crate::services::router::BoxService;
use crate::services::subgraph;
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
#[serde(deny_unknown_fields, default)]
#[schemars(rename = "LimitsConfig")]
pub(crate) struct Config {
pub(crate) router: RouterLimitsConfig,
pub(crate) subgraph: SubgraphConfiguration<SubgraphLimits>,
pub(crate) connector: ConnectorConfiguration<ConnectorLimits>,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
#[serde(deny_unknown_fields, default)]
#[schemars(rename = "RouterLimitsConfig")]
pub(crate) struct RouterLimitsConfig {
pub(crate) max_depth: Option<u32>,
pub(crate) max_height: Option<u32>,
pub(crate) max_root_fields: Option<u32>,
pub(crate) max_aliases: Option<u32>,
pub(crate) warn_only: bool,
pub(crate) parser_max_recursion: usize,
pub(crate) parser_max_tokens: usize,
pub(crate) http_max_request_bytes: usize,
pub(crate) http1_max_request_headers: Option<usize>,
#[schemars(with = "Option<String>", default)]
pub(crate) http1_max_request_buf_size: Option<ByteSize>,
#[schemars(with = "Option<String>", default)]
pub(crate) http2_max_headers_list_bytes: Option<ByteSize>,
pub(crate) introspection_max_depth: bool,
}
impl Default for RouterLimitsConfig {
fn default() -> Self {
Self {
max_depth: None,
max_height: None,
max_root_fields: None,
max_aliases: None,
warn_only: false,
http_max_request_bytes: 2_000_000,
http1_max_request_headers: None,
http1_max_request_buf_size: None,
http2_max_headers_list_bytes: None,
parser_max_tokens: 15_000,
parser_max_recursion: 500,
introspection_max_depth: true,
}
}
}
#[derive(Debug, Default, Clone, Deserialize, Serialize, JsonSchema)]
#[serde(deny_unknown_fields, default)]
#[schemars(rename = "SubgraphLimits")]
pub(crate) struct SubgraphLimits {
#[schemars(with = "Option<String>", default)]
pub(crate) http_max_response_size: Option<ByteSize>,
}
#[derive(Clone, Copy, Debug, Ord, PartialOrd, PartialEq, Eq)]
pub(crate) struct SubgraphResponseSizeLimit(pub usize);
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
#[serde(deny_unknown_fields, default)]
#[schemars(rename = "ConnectorLimits")]
pub(crate) struct ConnectorLimits {
#[schemars(with = "Option<String>", default)]
pub(crate) http_max_response_size: Option<ByteSize>,
}
#[derive(Clone, Copy, Debug, Ord, PartialOrd, PartialEq, Eq)]
pub(crate) struct ConnectorResponseSizeLimit(pub usize);
impl Config {
fn subgraph_response_size_limit(
&self,
subgraph_name: &str,
) -> Option<SubgraphResponseSizeLimit> {
let subgraph_limit = self
.subgraph
.subgraphs
.get(subgraph_name)
.and_then(|s| s.http_max_response_size);
let limit = subgraph_limit.or(self.subgraph.all.http_max_response_size)?;
Some(SubgraphResponseSizeLimit(limit.as_u64().try_into().ok()?))
}
fn connector_response_size_limit(
&self,
source_name: &str,
) -> Option<ConnectorResponseSizeLimit> {
let source_limit = self
.connector
.sources
.get(source_name)
.and_then(|s| s.http_max_response_size);
let limit = source_limit.or(self.connector.all.http_max_response_size)?;
Some(ConnectorResponseSizeLimit(limit.as_u64().try_into().ok()?))
}
}
struct LimitsPlugin {
config: Config,
}
#[async_trait]
impl PluginPrivate for LimitsPlugin {
type Config = Config;
async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError>
where
Self: Sized,
{
Ok(LimitsPlugin {
config: init.config,
})
}
fn router_service(&self, service: BoxService) -> BoxService {
ServiceBuilder::new()
.map_future_with_request_data(
|r: &router::Request| r.context.clone(),
|ctx, f| async { Self::map_error_to_graphql(f.await, ctx) },
)
.map_request(Into::into)
.map_response(Into::into)
.layer(RequestBodyLimitLayer::new(
self.config.router.http_max_request_bytes,
))
.map_request(Into::into)
.map_response(Into::into)
.service(service)
.boxed()
}
fn subgraph_service(&self, name: &str, service: subgraph::BoxService) -> subgraph::BoxService {
match self.config.subgraph_response_size_limit(name) {
None => service,
Some(limit) => ServiceBuilder::new()
.map_request(move |req: SubgraphRequest| {
req.context.extensions().with_lock(|e| e.insert(limit));
req
})
.service(service)
.boxed(),
}
}
fn connector_request_service(
&self,
service: connector::request_service::BoxService,
source_name: String,
) -> connector::request_service::BoxService {
match self.config.connector_response_size_limit(&source_name) {
None => service,
Some(limit) => ServiceBuilder::new()
.map_request(move |req: connector::request_service::Request| {
req.context.extensions().with_lock(|e| e.insert(limit));
req
})
.service(service)
.boxed(),
}
}
}
impl LimitsPlugin {
fn map_error_to_graphql(
resp: Result<router::Response, BoxError>,
ctx: Context,
) -> Result<router::Response, BoxError> {
match resp {
Ok(r) => {
if r.response.status() == StatusCode::PAYLOAD_TOO_LARGE {
Ok(BodyLimitError::PayloadTooLarge.into_response(ctx))
} else {
Ok(r)
}
}
Err(e) => {
let mut root_cause: &dyn Error = e.as_ref();
while let Some(cause) = root_cause.source() {
root_cause = cause;
}
match root_cause.downcast_ref::<BodyLimitError>() {
None => Err(e),
Some(_) => Ok(BodyLimitError::PayloadTooLarge.into_response(ctx)),
}
}
}
}
}
impl BodyLimitError {
fn into_response(self, ctx: Context) -> router::Response {
match self {
BodyLimitError::PayloadTooLarge => router::Response::error_builder()
.error(
graphql::Error::builder()
.message(self.to_string())
.extension_code("INVALID_GRAPHQL_REQUEST")
.extension("details", self.to_string())
.build(),
)
.status_code(StatusCode::PAYLOAD_TOO_LARGE)
.context(ctx)
.build()
.unwrap(),
}
}
}
register_private_plugin!("apollo", "limits", LimitsPlugin);
#[cfg(test)]
impl From<SubgraphConfiguration<SubgraphLimits>> for Config {
fn from(subgraph: SubgraphConfiguration<SubgraphLimits>) -> Self {
Self {
subgraph,
..Self::default()
}
}
}
#[cfg(test)]
impl From<ConnectorConfiguration<ConnectorLimits>> for Config {
fn from(connector: ConnectorConfiguration<ConnectorLimits>) -> Self {
Self {
connector,
..Self::default()
}
}
}
#[cfg(test)]
mod test {
use http::StatusCode;
use tower::BoxError;
use crate::Context;
use crate::plugins::limits::LimitsPlugin;
use crate::plugins::limits::SubgraphResponseSizeLimit;
use crate::plugins::limits::layer::BodyLimitControl;
use crate::plugins::test::PluginTestHarness;
use crate::services::router;
#[tokio::test]
async fn test_body_content_length_limit_exceeded() {
let plugin = plugin().await;
let resp = plugin
.router_service(|r| async {
let body = r.router_request.into_body();
let _ = router::body::into_bytes(body).await?;
panic!("should have failed to read stream")
})
.call(
router::Request::fake_builder()
.body(router::body::from_bytes("This is a test"))
.build()
.unwrap(),
)
.await;
assert!(resp.is_ok());
let resp = resp.unwrap();
assert_eq!(resp.response.status(), StatusCode::PAYLOAD_TOO_LARGE);
assert_eq!(
String::from_utf8(
router::body::into_bytes(resp.response.into_body())
.await
.unwrap()
.to_vec()
)
.unwrap(),
"{\"errors\":[{\"message\":\"Request body payload too large\",\"extensions\":{\"details\":\"Request body payload too large\",\"code\":\"INVALID_GRAPHQL_REQUEST\"}}]}"
);
}
#[tokio::test]
async fn test_body_content_length_limit_ok() {
let plugin = plugin().await;
let resp = plugin
.router_service(|r| async {
let body = r.router_request.into_body();
let body = router::body::into_bytes(body).await;
assert!(body.is_ok());
Ok(router::Response::fake_builder().build().unwrap())
})
.call(
router::Request::fake_builder()
.body(router::body::empty())
.build()
.unwrap(),
)
.await;
assert!(resp.is_ok());
let resp = resp.unwrap();
assert_eq!(resp.response.status(), StatusCode::OK);
assert_eq!(
String::from_utf8(
router::body::into_bytes(resp.response.into_body())
.await
.unwrap()
.to_vec()
)
.unwrap(),
"{}"
);
}
#[tokio::test]
async fn test_header_content_length_limit_exceeded() {
let plugin = plugin().await;
let resp = plugin
.router_service(|_| async { panic!("should have rejected request") })
.call(
router::Request::fake_builder()
.header("Content-Length", "100")
.body(router::body::empty())
.build()
.unwrap(),
)
.await;
assert!(resp.is_ok());
let resp = resp.unwrap();
assert_eq!(resp.response.status(), StatusCode::PAYLOAD_TOO_LARGE);
assert_eq!(
String::from_utf8(
router::body::into_bytes(resp.response.into_body())
.await
.unwrap()
.to_vec()
)
.unwrap(),
"{\"errors\":[{\"message\":\"Request body payload too large\",\"extensions\":{\"details\":\"Request body payload too large\",\"code\":\"INVALID_GRAPHQL_REQUEST\"}}]}"
);
}
#[tokio::test]
async fn test_header_content_length_limit_ok() {
let plugin = plugin().await;
let resp = plugin
.router_service(|_| async { Ok(router::Response::fake_builder().build().unwrap()) })
.call(
router::Request::fake_builder()
.header("Content-Length", "5")
.body(router::body::empty())
.build()
.unwrap(),
)
.await;
assert!(resp.is_ok());
let resp = resp.unwrap();
assert_eq!(resp.response.status(), StatusCode::OK);
assert_eq!(
String::from_utf8(
router::body::into_bytes(resp.response.into_body())
.await
.unwrap()
.to_vec()
)
.unwrap(),
"{}"
);
}
#[tokio::test]
async fn test_non_limit_error_passthrough() {
let plugin = plugin().await;
let resp = plugin
.router_service(|_| async { Err(BoxError::from("error")) })
.call(
router::Request::fake_builder()
.body(router::body::empty())
.build()
.unwrap(),
)
.await;
assert!(resp.is_err());
}
#[tokio::test]
async fn test_limits_dynamic_update() {
let plugin = plugin().await;
let resp = plugin
.router_service(|mut r: router::Request| async move {
let control = r
.router_request
.extensions_mut()
.get::<BodyLimitControl>()
.expect("body limit control must have been set")
.clone();
assert_eq!(control.remaining(), 10);
assert_eq!(control.limit(), 10);
control.update_limit(100);
let body = r.router_request.into_body();
let _ = router::body::into_bytes(body).await?;
assert_eq!(control.remaining(), 86);
Ok(router::Response::fake_builder().build().unwrap())
})
.call(
router::Request::fake_builder()
.body(router::body::from_bytes("This is a test"))
.build()
.unwrap(),
)
.await;
assert!(resp.is_ok());
let resp = resp.unwrap();
assert_eq!(resp.response.status(), StatusCode::OK);
assert_eq!(
String::from_utf8(
router::body::into_bytes(resp.response.into_body())
.await
.unwrap()
.to_vec()
)
.unwrap(),
"{}"
);
}
async fn plugin() -> PluginTestHarness<LimitsPlugin> {
let plugin: PluginTestHarness<LimitsPlugin> = PluginTestHarness::builder()
.config(include_str!("fixtures/content_length_limit.router.yaml"))
.build()
.await
.expect("test harness");
plugin
}
mod subgraph_response_limit {
use bytesize::ByteSize;
use crate::configuration::subgraph::SubgraphConfiguration;
use crate::plugins::limits::Config;
use crate::plugins::limits::SubgraphLimits;
use crate::plugins::limits::SubgraphResponseSizeLimit;
#[test]
fn get_response_limit_no_config() {
let subgraph_config = SubgraphConfiguration::<SubgraphLimits>::default();
let config: Config = subgraph_config.into();
assert_eq!(config.subgraph_response_size_limit("products"), None);
}
#[test]
fn get_response_limit_all() {
let mut subgraph_config = SubgraphConfiguration::<SubgraphLimits>::default();
subgraph_config.all.http_max_response_size = Some(ByteSize::kb(1));
let config: Config = subgraph_config.into();
assert_eq!(
config.subgraph_response_size_limit("products"),
Some(SubgraphResponseSizeLimit(1000))
);
assert_eq!(
config.subgraph_response_size_limit("reviews"),
Some(SubgraphResponseSizeLimit(1000))
);
}
#[test]
fn get_response_limit_subgraph_specific() {
let mut subgraph_config = SubgraphConfiguration::<SubgraphLimits>::default();
subgraph_config.subgraphs.insert(
"products".to_string(),
SubgraphLimits {
http_max_response_size: Some(ByteSize::b(512)),
},
);
let config: Config = subgraph_config.into();
assert_eq!(
config.subgraph_response_size_limit("products"),
Some(SubgraphResponseSizeLimit(512))
);
assert_eq!(config.subgraph_response_size_limit("reviews"), None);
}
#[test]
fn get_response_limit_subgraph_overrides_all() {
let mut subgraph_config = SubgraphConfiguration::<SubgraphLimits>::default();
subgraph_config.all.http_max_response_size = Some(ByteSize::kib(1));
subgraph_config.subgraphs.insert(
"products".to_string(),
SubgraphLimits {
http_max_response_size: Some(ByteSize::b(500)),
},
);
subgraph_config.subgraphs.insert(
"reviews".to_string(),
SubgraphLimits {
http_max_response_size: None,
},
);
let config: Config = subgraph_config.into();
assert_eq!(
config.subgraph_response_size_limit("products"),
Some(SubgraphResponseSizeLimit(500))
);
assert_eq!(
config.subgraph_response_size_limit("reviews"),
Some(SubgraphResponseSizeLimit(1024))
);
}
}
mod connector_response_limit {
use bytesize::ByteSize;
use crate::configuration::connector::ConnectorConfiguration;
use crate::plugins::limits::Config;
use crate::plugins::limits::ConnectorLimits;
use crate::plugins::limits::ConnectorResponseSizeLimit;
#[test]
fn get_response_limit_no_config() {
let connector_config = ConnectorConfiguration::<ConnectorLimits>::default();
let config: Config = connector_config.into();
assert_eq!(config.connector_response_size_limit("products.rest"), None);
}
#[test]
fn get_response_limit_all() {
let mut connector_config = ConnectorConfiguration::<ConnectorLimits>::default();
connector_config.all.http_max_response_size = Some(ByteSize::kb(1));
let config: Config = connector_config.into();
assert_eq!(
config.connector_response_size_limit("products.rest"),
Some(ConnectorResponseSizeLimit(1000))
);
assert_eq!(
config.connector_response_size_limit("reviews.api"),
Some(ConnectorResponseSizeLimit(1000))
);
}
#[test]
fn get_response_limit_subgraph_specific() {
let mut connector_config = ConnectorConfiguration::<ConnectorLimits>::default();
connector_config.sources.insert(
"products.rest".to_string(),
ConnectorLimits {
http_max_response_size: Some(ByteSize::b(512)),
},
);
let config: Config = connector_config.into();
assert_eq!(
config.connector_response_size_limit("products.rest"),
Some(ConnectorResponseSizeLimit(512))
);
assert_eq!(config.connector_response_size_limit("reviews.api"), None);
}
#[test]
fn get_response_limit_subgraph_overrides_all() {
let mut connector_config = ConnectorConfiguration::<ConnectorLimits>::default();
connector_config.all.http_max_response_size = Some(ByteSize::kib(1));
connector_config.sources.insert(
"products.rest".to_string(),
ConnectorLimits {
http_max_response_size: Some(ByteSize::b(500)),
},
);
connector_config.sources.insert(
"reviews.api".to_string(),
ConnectorLimits {
http_max_response_size: None,
},
);
let config: Config = connector_config.into();
assert_eq!(
config.connector_response_size_limit("products.rest"),
Some(ConnectorResponseSizeLimit(500))
);
assert_eq!(
config.connector_response_size_limit("reviews.api"),
Some(ConnectorResponseSizeLimit(1024))
);
}
}
fn make_connector_request(
ctx: Context,
) -> crate::services::connector::request_service::Request {
use std::sync::Arc;
use apollo_compiler::name;
use apollo_federation::connectors::ConnectId;
use apollo_federation::connectors::ConnectSpec;
use apollo_federation::connectors::Connector;
use apollo_federation::connectors::HttpJsonTransport;
use apollo_federation::connectors::JSONSelection;
use apollo_federation::connectors::runtime::http_json_transport::HttpRequest;
use apollo_federation::connectors::runtime::key::ResponseKey;
let connector = Connector {
spec: ConnectSpec::V0_1,
schema_subtypes_map: Default::default(),
id: ConnectId::new(
"subgraph_name".into(),
None,
name!(Query),
name!(hello),
None,
0,
),
transport: Some(HttpJsonTransport {
source_template: "http://localhost/api".parse().ok(),
connect_template: "/path".parse().unwrap(),
..Default::default()
}),
selection: JSONSelection::parse("$.data").unwrap(),
entity_resolver: None,
config: Default::default(),
max_requests: None,
batch_settings: None,
request_headers: Default::default(),
response_headers: Default::default(),
request_variable_keys: Default::default(),
response_variable_keys: Default::default(),
error_settings: Default::default(),
label: "test label".into(),
};
let key = ResponseKey::RootField {
name: "hello".to_string(),
inputs: Default::default(),
selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
};
let http_request = HttpRequest {
inner: http::Request::builder().body("{}".to_string()).unwrap(),
debug: Default::default(),
};
crate::services::connector::request_service::Request {
context: ctx,
connector: Arc::new(connector),
transport_request: http_request.into(),
key,
mapping_problems: Default::default(),
supergraph_request: Arc::new(
http::Request::builder()
.body(crate::graphql::Request::builder().build())
.expect("valid request"),
),
operation: Default::default(),
}
}
fn make_stub_connector_response(
req: &crate::services::connector::request_service::Request,
) -> crate::services::connector::request_service::Response {
use apollo_federation::connectors::runtime::http_json_transport::HttpResponse;
use apollo_federation::connectors::runtime::http_json_transport::TransportResponse;
use apollo_federation::connectors::runtime::responses::MappedResponse;
use serde_json_bytes::Value;
let (parts, _) = http::Response::builder().body(()).unwrap().into_parts();
crate::services::connector::request_service::Response {
context: req.context.clone(),
transport_result: Ok(TransportResponse::Http(HttpResponse { inner: parts })),
mapped_response: MappedResponse::Data {
data: Value::Null,
key: req.key.clone(),
problems: vec![],
},
}
}
#[tokio::test]
async fn connector_request_service_sets_limit_on_context() {
use crate::plugins::limits::ConnectorResponseSizeLimit;
let plugin: PluginTestHarness<LimitsPlugin> = PluginTestHarness::builder()
.config("limits:\n connector:\n all:\n http_max_response_size: 2kib")
.build()
.await
.expect("test harness");
let result = plugin
.call_connector_request_service(
make_connector_request(Context::new()),
|req: crate::services::connector::request_service::Request| {
let limit = req
.context
.extensions()
.with_lock(|e| e.get::<ConnectorResponseSizeLimit>().copied());
assert_eq!(
limit.map(|l| l.0),
Some(2048),
"limit should be set on context"
);
make_stub_connector_response(&req)
},
)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn connector_request_service_no_limit_does_not_set_extension() {
use crate::plugins::limits::ConnectorResponseSizeLimit;
let plugin: PluginTestHarness<LimitsPlugin> = PluginTestHarness::builder()
.config(include_str!("fixtures/content_length_limit.router.yaml"))
.build()
.await
.expect("test harness");
let result = plugin
.call_connector_request_service(
make_connector_request(Context::new()),
|req: crate::services::connector::request_service::Request| {
let limit = req
.context
.extensions()
.with_lock(|e| e.get::<ConnectorResponseSizeLimit>().copied());
assert!(limit.is_none(), "no limit should be set on context");
make_stub_connector_response(&req)
},
)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn subgraph_service_sets_limit_on_context() {
let plugin: PluginTestHarness<LimitsPlugin> = PluginTestHarness::builder()
.config("limits:\n subgraph:\n all:\n http_max_response_size: 1024b")
.build()
.await
.expect("test harness");
let result = plugin
.subgraph_service(
"products",
|req: crate::services::SubgraphRequest| async move {
let limit = req
.context
.extensions()
.with_lock(|e| e.get::<SubgraphResponseSizeLimit>().copied());
assert_eq!(
limit.map(|l| l.0),
Some(1024),
"limit should be set on context"
);
Ok(crate::services::SubgraphResponse::fake_builder()
.context(req.context)
.build())
},
)
.call_default()
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn subgraph_service_no_limit_does_not_set_extension() {
let plugin: PluginTestHarness<LimitsPlugin> = PluginTestHarness::builder()
.config(include_str!("fixtures/content_length_limit.router.yaml"))
.build()
.await
.expect("test harness");
let result = plugin
.subgraph_service(
"products",
|req: crate::services::SubgraphRequest| async move {
let limit = req
.context
.extensions()
.with_lock(|e| e.get::<SubgraphResponseSizeLimit>().copied());
assert!(limit.is_none(), "no limit should be set on context");
Ok(crate::services::SubgraphResponse::fake_builder()
.context(req.context)
.build())
},
)
.call_default()
.await;
assert!(result.is_ok());
}
}