mod deduplication;
pub(crate) mod rate;
pub(crate) mod timeout;
use std::collections::HashMap;
use std::num::NonZeroU64;
use std::sync::Mutex;
use std::time::Duration;
use futures::FutureExt;
use futures::future::BoxFuture;
use http::HeaderValue;
use http::StatusCode;
use http::header::CONTENT_ENCODING;
use schemars::JsonSchema;
use serde::Deserialize;
use tower::BoxError;
use tower::Service;
use tower::ServiceBuilder;
use tower::ServiceExt;
use tower::util::Either;
use self::deduplication::QueryDeduplicationLayer;
use self::rate::RateLimitLayer;
use self::rate::RateLimited;
use self::timeout::Elapsed;
use self::timeout::TimeoutLayer;
use crate::configuration::shared::DnsResolutionStrategy;
use crate::error::ConfigurationError;
use crate::graphql;
use crate::layers::ServiceBuilderExt;
use crate::plugin::Plugin;
use crate::plugin::PluginInit;
use crate::register_plugin;
use crate::services::SubgraphRequest;
use crate::services::http::service::Compression;
use crate::services::subgraph;
use crate::services::supergraph;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
pub(crate) const APOLLO_TRAFFIC_SHAPING: &str = "apollo.traffic_shaping";
trait Merge {
fn merge(&self, fallback: Option<&Self>) -> Self;
}
#[derive(PartialEq, Debug, Clone, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
struct Shaping {
deduplicate_query: Option<bool>,
compression: Option<Compression>,
global_rate_limit: Option<RateLimitConf>,
#[serde(deserialize_with = "humantime_serde::deserialize", default)]
#[schemars(with = "String", default)]
timeout: Option<Duration>,
experimental_http2: Option<Http2Config>,
dns_resolution_strategy: Option<DnsResolutionStrategy>,
}
#[derive(PartialEq, Default, Debug, Clone, Deserialize, JsonSchema)]
#[serde(rename_all = "lowercase")]
pub(crate) enum Http2Config {
#[default]
Enable,
Disable,
Http2Only,
}
impl Merge for Shaping {
fn merge(&self, fallback: Option<&Self>) -> Self {
match fallback {
None => self.clone(),
Some(fallback) => Shaping {
deduplicate_query: self.deduplicate_query.or(fallback.deduplicate_query),
compression: self.compression.or(fallback.compression),
timeout: self.timeout.or(fallback.timeout),
global_rate_limit: self
.global_rate_limit
.as_ref()
.or(fallback.global_rate_limit.as_ref())
.cloned(),
experimental_http2: self
.experimental_http2
.as_ref()
.or(fallback.experimental_http2.as_ref())
.cloned(),
dns_resolution_strategy: self
.dns_resolution_strategy
.as_ref()
.or(fallback.dns_resolution_strategy.as_ref())
.cloned(),
},
}
}
}
#[derive(PartialEq, Debug, Clone, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
struct SubgraphShaping {
#[serde(flatten)]
shaping: Shaping,
}
impl Merge for SubgraphShaping {
fn merge(&self, fallback: Option<&Self>) -> Self {
match fallback {
None => self.clone(),
Some(fallback) => SubgraphShaping {
shaping: self.shaping.merge(Some(&fallback.shaping)),
},
}
}
}
#[derive(PartialEq, Debug, Clone, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
struct RouterShaping {
global_rate_limit: Option<RateLimitConf>,
#[serde(deserialize_with = "humantime_serde::deserialize", default)]
#[schemars(with = "String", default)]
timeout: Option<Duration>,
}
#[derive(Debug, Clone, Default, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct Config {
router: Option<RouterShaping>,
all: Option<SubgraphShaping>,
subgraphs: HashMap<String, SubgraphShaping>,
deduplicate_variables: Option<bool>,
}
#[derive(PartialEq, Debug, Clone, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
struct RateLimitConf {
capacity: NonZeroU64,
#[serde(deserialize_with = "humantime_serde::deserialize")]
#[schemars(with = "String")]
interval: Duration,
}
impl Merge for RateLimitConf {
fn merge(&self, fallback: Option<&Self>) -> Self {
match fallback {
None => self.clone(),
Some(fallback) => Self {
capacity: fallback.capacity,
interval: fallback.interval,
},
}
}
}
pub(crate) struct TrafficShaping {
config: Config,
rate_limit_router: Option<RateLimitLayer>,
rate_limit_subgraphs: Mutex<HashMap<String, RateLimitLayer>>,
}
#[async_trait::async_trait]
impl Plugin for TrafficShaping {
type Config = Config;
async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError> {
let rate_limit_router = init
.config
.router
.as_ref()
.and_then(|r| r.global_rate_limit.as_ref())
.map(|router_rate_limit_conf| {
if router_rate_limit_conf.interval.as_millis() > u64::MAX as u128 {
Err(ConfigurationError::InvalidConfiguration {
message: "bad configuration for traffic_shaping plugin",
error: format!(
"cannot set an interval for the rate limit greater than {} ms",
u64::MAX
),
})
} else {
Ok(RateLimitLayer::new(
router_rate_limit_conf.capacity,
router_rate_limit_conf.interval,
))
}
})
.transpose()?;
{
Ok(Self {
config: init.config,
rate_limit_router,
rate_limit_subgraphs: Mutex::new(HashMap::new()),
})
}
}
}
pub(crate) type TrafficShapingSubgraphFuture<S> = Either<
Either<
BoxFuture<'static, Result<subgraph::Response, BoxError>>,
BoxFuture<'static, Result<subgraph::Response, BoxError>>,
>,
<S as Service<subgraph::Request>>::Future,
>;
impl TrafficShaping {
fn merge_config<T: Merge + Clone>(
all_config: Option<&T>,
subgraph_config: Option<&T>,
) -> Option<T> {
let merged_subgraph_config = subgraph_config.map(|c| c.merge(all_config));
merged_subgraph_config.or_else(|| all_config.cloned())
}
pub(crate) fn supergraph_service_internal<S>(
&self,
service: S,
) -> impl Service<
supergraph::Request,
Response = supergraph::Response,
Error = BoxError,
Future = BoxFuture<'static, Result<supergraph::Response, BoxError>>,
> + Clone
+ Send
+ Sync
+ 'static
where
S: Service<supergraph::Request, Response = supergraph::Response, Error = BoxError>
+ Clone
+ Send
+ Sync
+ 'static,
<S as Service<supergraph::Request>>::Future: std::marker::Send,
{
ServiceBuilder::new()
.map_future_with_request_data(
|req: &supergraph::Request| req.context.clone(),
move |ctx, future| {
async {
let response: Result<supergraph::Response, BoxError> = future.await;
match response {
Err(error) if error.is::<Elapsed>() => {
supergraph::Response::error_builder()
.status_code(StatusCode::GATEWAY_TIMEOUT)
.error::<graphql::Error>(Elapsed::new().into())
.context(ctx)
.build()
}
Err(error) if error.is::<RateLimited>() => {
supergraph::Response::error_builder()
.status_code(StatusCode::TOO_MANY_REQUESTS)
.error::<graphql::Error>(RateLimited::new().into())
.context(ctx)
.build()
}
_ => response,
}
}
.boxed()
},
)
.layer(TimeoutLayer::new(
self.config
.router
.as_ref()
.and_then(|r| r.timeout)
.unwrap_or(DEFAULT_TIMEOUT),
))
.option_layer(self.rate_limit_router.clone())
.service(service)
}
pub(crate) fn subgraph_service_internal<S>(
&self,
name: &str,
service: S,
) -> impl Service<
subgraph::Request,
Response = subgraph::Response,
Error = BoxError,
Future = TrafficShapingSubgraphFuture<S>,
> + Clone
+ Send
+ Sync
+ 'static
where
S: Service<subgraph::Request, Response = subgraph::Response, Error = BoxError>
+ Clone
+ Send
+ Sync
+ 'static,
<S as Service<subgraph::Request>>::Future: std::marker::Send,
{
let all_config = self.config.all.as_ref();
let subgraph_config = self.config.subgraphs.get(name);
let final_config = Self::merge_config(all_config, subgraph_config);
if let Some(config) = final_config {
let rate_limit = config
.shaping
.global_rate_limit
.as_ref()
.map(|rate_limit_conf| {
self.rate_limit_subgraphs
.lock()
.unwrap()
.entry(name.to_string())
.or_insert_with(|| {
RateLimitLayer::new(rate_limit_conf.capacity, rate_limit_conf.interval)
})
.clone()
});
Either::A(ServiceBuilder::new()
.option_layer(config.shaping.deduplicate_query.unwrap_or_default().then(
QueryDeduplicationLayer::default
))
.map_future_with_request_data(
|req: &subgraph::Request| req.context.clone(),
move |ctx, future| {
async {
let response: Result<subgraph::Response, BoxError> = future.await;
match response {
Err(error) if error.is::<Elapsed>() => {
subgraph::Response::error_builder()
.status_code(StatusCode::GATEWAY_TIMEOUT)
.error::<graphql::Error>(Elapsed::new().into())
.context(ctx)
.build()
}
Err(error) if error.is::<RateLimited>() => {
subgraph::Response::error_builder()
.status_code(StatusCode::TOO_MANY_REQUESTS)
.error::<graphql::Error>(RateLimited::new().into())
.context(ctx)
.build()
}
_ => response,
}
}.boxed()
},
)
.layer(TimeoutLayer::new(
config.shaping
.timeout
.unwrap_or(DEFAULT_TIMEOUT),
))
.option_layer(rate_limit)
.service(service)
.map_request(move |mut req: SubgraphRequest| {
if let Some(compression) = config.shaping.compression {
let compression_header_val = HeaderValue::from_str(&compression.to_string()).expect("compression is manually implemented and already have the right values; qed");
req.subgraph_request.headers_mut().insert(CONTENT_ENCODING, compression_header_val);
}
req
}))
} else {
Either::B(service)
}
}
pub(crate) fn subgraph_client_config(
&self,
service_name: &str,
) -> crate::configuration::shared::Client {
Self::merge_config(
self.config.all.as_ref(),
self.config.subgraphs.get(service_name),
)
.map(|config| crate::configuration::shared::Client {
experimental_http2: config.shaping.experimental_http2,
dns_resolution_strategy: config.shaping.dns_resolution_strategy,
})
.unwrap_or_default()
}
}
register_plugin!("apollo", "traffic_shaping", TrafficShaping);
#[cfg(test)]
mod test {
use std::sync::Arc;
use bytes::Bytes;
use maplit::hashmap;
use once_cell::sync::Lazy;
use serde_json_bytes::ByteString;
use serde_json_bytes::Value;
use serde_json_bytes::json;
use tower::Service;
use super::*;
use crate::Configuration;
use crate::json_ext::Object;
use crate::plugin::DynPlugin;
use crate::plugin::test::MockSubgraph;
use crate::plugin::test::MockSupergraphService;
use crate::query_planner::QueryPlannerService;
use crate::router_factory::create_plugins;
use crate::services::HasSchema;
use crate::services::PluggableSupergraphServiceBuilder;
use crate::services::SupergraphRequest;
use crate::services::SupergraphResponse;
use crate::services::layers::persisted_queries::PersistedQueryLayer;
use crate::services::layers::query_analysis::QueryAnalysisLayer;
use crate::services::router;
use crate::services::router::service::RouterCreator;
use crate::spec::Schema;
static EXPECTED_RESPONSE: Lazy<Bytes> = Lazy::new(|| {
Bytes::from_static(r#"{"data":{"topProducts":[{"upc":"1","name":"Table","reviews":[{"id":"1","product":{"name":"Table"},"author":{"id":"1","name":"Ada Lovelace"}},{"id":"4","product":{"name":"Table"},"author":{"id":"2","name":"Alan Turing"}}]},{"upc":"2","name":"Couch","reviews":[{"id":"2","product":{"name":"Couch"},"author":{"id":"1","name":"Ada Lovelace"}}]}]}}"#.as_bytes())
});
static VALID_QUERY: &str = r#"query TopProducts($first: Int) { topProducts(first: $first) { upc name reviews { id product { name } author { id name } } } }"#;
async fn execute_router_test(
query: &str,
body: &Bytes,
mut router_service: router::BoxService,
) {
let request = SupergraphRequest::fake_builder()
.query(query.to_string())
.variable("first", 2usize)
.build()
.expect("expecting valid request")
.try_into()
.unwrap();
let response = router_service
.ready()
.await
.unwrap()
.call(request)
.await
.unwrap()
.next_response()
.await
.unwrap()
.unwrap();
assert_eq!(response, body);
}
async fn build_mock_router_with_variable_dedup_optimization(
plugin: Box<dyn DynPlugin>,
) -> router::BoxService {
let mut extensions = Object::new();
extensions.insert("test", Value::String(ByteString::from("value")));
let account_mocks = vec![
(
r#"{"query":"query TopProducts__accounts__3($representations:[_Any!]!){_entities(representations:$representations){...on User{name}}}","operationName":"TopProducts__accounts__3","variables":{"representations":[{"__typename":"User","id":"1"},{"__typename":"User","id":"2"}]}}"#,
r#"{"data":{"_entities":[{"name":"Ada Lovelace"},{"name":"Alan Turing"}]}}"#
)
].into_iter().map(|(query, response)| (serde_json::from_str(query).unwrap(), serde_json::from_str(response).unwrap())).collect();
let account_service = MockSubgraph::new(account_mocks);
let review_mocks = vec![
(
r#"{"query":"query TopProducts__reviews__1($representations:[_Any!]!){_entities(representations:$representations){...on Product{reviews{id product{__typename upc}author{__typename id}}}}}","operationName":"TopProducts__reviews__1","variables":{"representations":[{"__typename":"Product","upc":"1"},{"__typename":"Product","upc":"2"}]}}"#,
r#"{"data":{"_entities":[{"reviews":[{"id":"1","product":{"__typename":"Product","upc":"1"},"author":{"__typename":"User","id":"1"}},{"id":"4","product":{"__typename":"Product","upc":"1"},"author":{"__typename":"User","id":"2"}}]},{"reviews":[{"id":"2","product":{"__typename":"Product","upc":"2"},"author":{"__typename":"User","id":"1"}}]}]}}"#
)
].into_iter().map(|(query, response)| (serde_json::from_str(query).unwrap(), serde_json::from_str(response).unwrap())).collect();
let review_service = MockSubgraph::new(review_mocks);
let product_mocks = vec![
(
r#"{"query":"query TopProducts__products__0($first:Int){topProducts(first:$first){__typename upc name}}","operationName":"TopProducts__products__0","variables":{"first":2}}"#,
r#"{"data":{"topProducts":[{"__typename":"Product","upc":"1","name":"Table"},{"__typename":"Product","upc":"2","name":"Couch"}]}}"#
),
(
r#"{"query":"query TopProducts__products__2($representations:[_Any!]!){_entities(representations:$representations){...on Product{name}}}","operationName":"TopProducts__products__2","variables":{"representations":[{"__typename":"Product","upc":"1"},{"__typename":"Product","upc":"2"}]}}"#,
r#"{"data":{"_entities":[{"name":"Table"},{"name":"Couch"}]}}"#
)
].into_iter().map(|(query, response)| (serde_json::from_str(query).unwrap(), serde_json::from_str(response).unwrap())).collect();
let product_service = MockSubgraph::new(product_mocks).with_extensions(extensions);
let schema = include_str!(
"../../../../apollo-router-benchmarks/benches/fixtures/supergraph.graphql"
);
let config: Configuration = serde_yaml::from_str(
r#"
traffic_shaping:
deduplicate_variables: true
supergraph:
# TODO(@goto-bus-stop): need to update the mocks and remove this, #6013
generate_query_fragments: false
"#,
)
.unwrap();
let config = Arc::new(config);
let schema = Arc::new(Schema::parse(schema, &config).unwrap());
let planner = QueryPlannerService::new(schema.clone(), config.clone())
.await
.unwrap();
let subgraph_schemas = Arc::new(
planner
.subgraph_schemas()
.iter()
.map(|(k, v)| (k.clone(), v.schema.clone()))
.collect(),
);
let mut builder =
PluggableSupergraphServiceBuilder::new(planner).with_configuration(config.clone());
let plugins = Arc::new(
create_plugins(
&config,
&schema,
subgraph_schemas,
None,
Some(vec![(APOLLO_TRAFFIC_SHAPING.to_string(), plugin)]),
)
.await
.expect("create plugins should work"),
);
builder = builder.with_plugins(plugins);
let builder = builder
.with_subgraph_service("accounts", account_service.clone())
.with_subgraph_service("reviews", review_service.clone())
.with_subgraph_service("products", product_service.clone());
let supergraph_creator = builder.build().await.expect("should build");
RouterCreator::new(
QueryAnalysisLayer::new(supergraph_creator.schema(), Default::default()).await,
Arc::new(PersistedQueryLayer::new(&Default::default()).await.unwrap()),
Arc::new(supergraph_creator),
Arc::new(Configuration::default()),
)
.await
.unwrap()
.make()
.boxed()
}
async fn get_traffic_shaping_plugin(config: &serde_json::Value) -> Box<dyn DynPlugin> {
crate::plugin::plugins()
.find(|factory| factory.name == APOLLO_TRAFFIC_SHAPING)
.expect("Plugin not found")
.create_instance_without_schema(config)
.await
.expect("Plugin not created")
}
#[tokio::test]
async fn it_returns_valid_response_for_deduplicated_variables() {
let config = serde_yaml::from_str::<serde_json::Value>(
r#"
deduplicate_variables: true
"#,
)
.unwrap();
let plugin = get_traffic_shaping_plugin(&config).await;
let router = build_mock_router_with_variable_dedup_optimization(plugin).await;
execute_router_test(VALID_QUERY, &EXPECTED_RESPONSE, router).await;
}
#[tokio::test]
async fn it_add_correct_headers_for_compression() {
let config = serde_yaml::from_str::<serde_json::Value>(
r#"
subgraphs:
test:
compression: gzip
"#,
)
.unwrap();
let plugin = get_traffic_shaping_plugin(&config).await;
let request = SubgraphRequest::fake_builder().build();
let test_service = MockSubgraph::new(HashMap::new()).map_request(|req: SubgraphRequest| {
assert_eq!(
req.subgraph_request
.headers()
.get(&CONTENT_ENCODING)
.unwrap(),
HeaderValue::from_static("gzip")
);
req
});
let _response = plugin
.as_any()
.downcast_ref::<TrafficShaping>()
.unwrap()
.subgraph_service_internal("test", test_service)
.oneshot(request)
.await
.unwrap();
}
#[test]
fn test_merge_config() {
let config = serde_yaml::from_str::<Config>(
r#"
all:
deduplicate_query: true
subgraphs:
products:
deduplicate_query: false
"#,
)
.unwrap();
assert_eq!(TrafficShaping::merge_config::<Shaping>(None, None), None);
assert_eq!(
TrafficShaping::merge_config(config.all.as_ref(), None),
config.all
);
assert_eq!(
TrafficShaping::merge_config(config.all.as_ref(), config.subgraphs.get("products"))
.as_ref(),
config.subgraphs.get("products")
);
assert_eq!(
TrafficShaping::merge_config(None, config.subgraphs.get("products")).as_ref(),
config.subgraphs.get("products")
);
}
#[test]
fn test_merge_http2_all() {
let config = serde_yaml::from_str::<Config>(
r#"
all:
experimental_http2: disable
subgraphs:
products:
experimental_http2: enable
reviews:
experimental_http2: disable
router:
timeout: 65s
"#,
)
.unwrap();
assert!(
TrafficShaping::merge_config(config.all.as_ref(), config.subgraphs.get("products"))
.unwrap()
.shaping
.experimental_http2
.unwrap()
== Http2Config::Enable
);
assert!(
TrafficShaping::merge_config(config.all.as_ref(), config.subgraphs.get("reviews"))
.unwrap()
.shaping
.experimental_http2
.unwrap()
== Http2Config::Disable
);
assert!(
TrafficShaping::merge_config(config.all.as_ref(), None)
.unwrap()
.shaping
.experimental_http2
.unwrap()
== Http2Config::Disable
);
}
#[tokio::test]
async fn test_subgraph_client_config() {
let config = serde_yaml::from_str::<Config>(
r#"
all:
experimental_http2: disable
dns_resolution_strategy: ipv6_only
subgraphs:
products:
experimental_http2: enable
dns_resolution_strategy: ipv6_then_ipv4
reviews:
experimental_http2: disable
dns_resolution_strategy: ipv4_only
router:
timeout: 65s
"#,
)
.unwrap();
let shaping_config = TrafficShaping::new(PluginInit::fake_builder().config(config).build())
.await
.unwrap();
assert_eq!(
shaping_config.subgraph_client_config("products"),
crate::configuration::shared::Client {
experimental_http2: Some(Http2Config::Enable),
dns_resolution_strategy: Some(DnsResolutionStrategy::Ipv6ThenIpv4),
},
);
assert_eq!(
shaping_config.subgraph_client_config("reviews"),
crate::configuration::shared::Client {
experimental_http2: Some(Http2Config::Disable),
dns_resolution_strategy: Some(DnsResolutionStrategy::Ipv4Only),
},
);
assert_eq!(
shaping_config.subgraph_client_config("this_doesnt_exist"),
crate::configuration::shared::Client {
experimental_http2: Some(Http2Config::Disable),
dns_resolution_strategy: Some(DnsResolutionStrategy::Ipv6Only),
},
);
}
#[tokio::test(flavor = "multi_thread")]
async fn it_rate_limit_subgraph_requests() {
let config = serde_yaml::from_str::<serde_json::Value>(
r#"
subgraphs:
test:
global_rate_limit:
capacity: 1
interval: 100ms
timeout: 500ms
"#,
)
.unwrap();
let plugin = get_traffic_shaping_plugin(&config).await;
let test_service = MockSubgraph::new(hashmap! {
graphql::Request::default() => graphql::Response::default()
});
assert!(
&plugin
.as_any()
.downcast_ref::<TrafficShaping>()
.unwrap()
.subgraph_service_internal("test", test_service.clone())
.oneshot(SubgraphRequest::fake_builder().build())
.await
.unwrap()
.response
.body()
.errors
.is_empty()
);
assert_eq!(
plugin
.as_any()
.downcast_ref::<TrafficShaping>()
.unwrap()
.subgraph_service_internal("test", test_service.clone())
.oneshot(SubgraphRequest::fake_builder().build())
.await
.unwrap()
.response
.body()
.errors[0]
.extensions
.get("code")
.unwrap(),
"REQUEST_RATE_LIMITED"
);
assert!(
plugin
.as_any()
.downcast_ref::<TrafficShaping>()
.unwrap()
.subgraph_service_internal("another", test_service.clone())
.oneshot(SubgraphRequest::fake_builder().build())
.await
.unwrap()
.response
.body()
.errors
.is_empty()
);
tokio::time::sleep(Duration::from_millis(300)).await;
assert!(
plugin
.as_any()
.downcast_ref::<TrafficShaping>()
.unwrap()
.subgraph_service_internal("test", test_service.clone())
.oneshot(SubgraphRequest::fake_builder().build())
.await
.unwrap()
.response
.body()
.errors
.is_empty()
);
}
#[tokio::test(flavor = "multi_thread")]
async fn it_rate_limit_router_requests() {
let config = serde_yaml::from_str::<serde_json::Value>(
r#"
router:
global_rate_limit:
capacity: 1
interval: 100ms
timeout: 500ms
"#,
)
.unwrap();
let plugin = get_traffic_shaping_plugin(&config).await;
let mut mock_service = MockSupergraphService::new();
mock_service.expect_clone().returning(|| {
let mut mock_service = MockSupergraphService::new();
mock_service.expect_clone().returning(|| {
let mut mock_service = MockSupergraphService::new();
mock_service.expect_call().times(0..2).returning(move |_| {
Ok(SupergraphResponse::fake_builder()
.data(json!({ "test": 1234_u32 }))
.build()
.unwrap())
});
mock_service
});
mock_service
});
assert!(
plugin
.as_any()
.downcast_ref::<TrafficShaping>()
.unwrap()
.supergraph_service_internal(mock_service.clone())
.oneshot(SupergraphRequest::fake_builder().build().unwrap())
.await
.unwrap()
.next_response()
.await
.unwrap()
.errors
.is_empty()
);
assert_eq!(
plugin
.as_any()
.downcast_ref::<TrafficShaping>()
.unwrap()
.supergraph_service_internal(mock_service.clone())
.oneshot(SupergraphRequest::fake_builder().build().unwrap())
.await
.unwrap()
.next_response()
.await
.unwrap()
.errors[0]
.extensions
.get("code")
.unwrap(),
"REQUEST_RATE_LIMITED"
);
tokio::time::sleep(Duration::from_millis(300)).await;
assert!(
plugin
.as_any()
.downcast_ref::<TrafficShaping>()
.unwrap()
.supergraph_service_internal(mock_service.clone())
.oneshot(SupergraphRequest::fake_builder().build().unwrap())
.await
.unwrap()
.next_response()
.await
.unwrap()
.errors
.is_empty()
);
}
}