use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::time::Duration;
use apollo_router::graphql;
use apollo_router::services::router;
use apollo_router::services::router::body::RouterBody;
use apollo_router::services::supergraph;
use apollo_router::test_harness::HttpService;
use fred::clients::Client;
use fred::interfaces::ClientLike;
use fred::interfaces::KeysInterface;
use fred::types::Builder;
use http::HeaderMap;
use http::HeaderName;
use http::HeaderValue;
use http_body_util::BodyExt as _;
use indexmap::IndexMap;
use serde_json::Value;
use serde_json::json;
use tokio_util::future::FutureExt;
use tower::BoxError;
use tower::Service as _;
use tower::ServiceExt as _;
use crate::integration::IntegrationTest;
use crate::integration::common::graph_os_enabled;
use crate::integration::common::redact_cache_debug_query_hash;
const REDIS_URL: &str = "redis://127.0.0.1:6379";
const INVALIDATION_PATH: &str = "/invalidation";
const INVALIDATION_SHARED_KEY: &str = "supersecret";
pub(crate) fn namespace() -> String {
uuid::Uuid::new_v4().simple().to_string()
}
async fn redis_client() -> Result<Client, BoxError> {
let client =
Builder::from_config(fred::prelude::Config::from_url(REDIS_URL).unwrap()).build()?;
client.init().await?;
Ok(client)
}
fn extract_cache_keys_from_response(response: &graphql::Response) -> Vec<String> {
response
.extensions
.get("apolloCacheDebugging")
.and_then(|v| v.get("data"))
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|item| item.get("key"))
.filter_map(|k| k.as_str())
.map(|s| s.to_string())
.collect()
})
.unwrap_or_default()
}
fn extract_cache_keys_by_subgraph(
response: &graphql::Response,
) -> std::collections::HashMap<String, Vec<String>> {
let mut result: std::collections::HashMap<String, Vec<String>> =
std::collections::HashMap::new();
if let Some(data) = response
.extensions
.get("apolloCacheDebugging")
.and_then(|v| v.get("data"))
.and_then(|v| v.as_array())
{
for item in data {
if let (Some(key), Some(subgraph)) = (
item.get("key").and_then(|k| k.as_str()),
item.get("subgraphName").and_then(|s| s.as_str()),
) {
result
.entry(subgraph.to_string())
.or_default()
.push(key.to_string());
}
}
}
result
}
fn base_config() -> Value {
json!({
"include_subgraph_errors": {
"all": true,
},
"rhai": {
"scripts": "tests/integration/fixtures",
"main": "test_cache.rhai",
},
"headers": {
"all": {
"request": [
{
"propagate": {
"named": "private_id"
}
}
]
}
},
"response_cache": {
"enabled": true,
"subgraph": {
"all": {
"redis": {
"urls": ["redis://127.0.0.1:6379"],
"pool_size": 3,
"namespace": namespace(),
"required_to_start": true,
},
"ttl": "10m",
"invalidation": {
"enabled": true,
"shared_key": INVALIDATION_SHARED_KEY,
},
"private_id": "private_id"
},
},
"invalidation": {
"listen": "127.0.0.1:4000",
"path": INVALIDATION_PATH,
},
},
})
}
fn config_with_subgraph_prometheus() -> Value {
json!({
"telemetry": {
"exporters": {
"metrics": {
"prometheus": {
"enabled": true,
"listen": "127.0.0.1:9090",
"path": "/metrics"
}
}
}
},
"response_cache": {
"enabled": true,
"subgraph": {
"all": {
"redis": {
"urls": ["redis://127.0.0.1:6379"],
"pool_size": 3,
"namespace": namespace(),
"required_to_start": true,
},
"ttl": "10m",
"private_id": "private_id"
},
"subgraphs": {
"user": {
"enabled": true,
"private_id": "user"
}
}
}
},
})
}
fn failure_config() -> Value {
json!({
"include_subgraph_errors": {
"all": true,
},
"response_cache": {
"enabled": true,
"subgraph": {
"all": {
"redis": {
"urls": ["redis://invalid"],
"pool_size": 3,
"namespace": namespace(),
"required_to_start": false,
},
"ttl": "10m",
"invalidation": {
"enabled": true,
"shared_key": INVALIDATION_SHARED_KEY,
},
},
},
"invalidation": {
"listen": "127.0.0.1:4000",
"path": INVALIDATION_PATH,
},
},
})
}
fn base_subgraphs() -> Value {
json!({
"products": {
"headers": {"cache-control": "public"},
"query": {
"topProducts": [
{"upc": "1", "__cacheTags": ["topProducts"]},
{"upc": "2"},
],
},
},
"reviews": {
"headers": {"cache-control": "public"},
"entities": [
{
"__cacheTags": ["product-1"],
"__typename": "Product",
"upc": "1",
"reviews": [{"id": "r1a"}, {"id": "r1b"}],
},
{
"__cacheTags": ["product-2"],
"__typename": "Product",
"upc": "2",
"reviews": [{"id": "r2"}],
},
],
},
})
}
fn private_base_subgraphs() -> Value {
json!({
"products": {
"headers": {"cache-control": "private"},
"query": {
"topProducts": [
{"upc": "1", "__cacheTags": ["topProducts"]},
{"upc": "2"},
],
},
},
"reviews": {
"headers": {"cache-control": "private"},
"entities": [
{
"__cacheTags": ["product-1"],
"__typename": "Product",
"upc": "1",
"reviews": [{"id": "r1a"}, {"id": "r1b"}],
},
{
"__cacheTags": ["product-2"],
"__typename": "Product",
"upc": "2",
"reviews": [{"id": "r2"}],
},
],
},
})
}
async fn harness(
mut config: Value,
subgraphs: Value,
) -> (HttpService, Arc<IndexMap<String, Arc<AtomicUsize>>>) {
let counters = Arc::new(IndexMap::from([
("products".into(), Default::default()),
("reviews".into(), Default::default()),
]));
let counters2 = Arc::clone(&counters);
config
.as_object_mut()
.unwrap()
.insert("experimental_mock_subgraphs".into(), subgraphs);
let router = apollo_router::TestHarness::builder()
.schema(include_str!("../../testing_schema.graphql"))
.configuration_json(config)
.unwrap()
.subgraph_hook(move |subgraph_name, service| {
if let Some(counter) = counters2.get(subgraph_name) {
let counter = Arc::<AtomicUsize>::clone(counter);
service
.map_request(move |req| {
counter.fetch_add(1, Ordering::Relaxed);
req
})
.boxed()
} else {
service
}
})
.build_http_service()
.await
.unwrap();
(router, counters)
}
async fn make_graphql_request(router: &mut HttpService) -> (HeaderMap<String>, graphql::Response) {
let query = "{ topProducts { reviews { id } } }";
let request = graphql_request(query);
make_http_request(router, request.into()).await
}
async fn make_debug_graphql_request(
router: &mut HttpService,
) -> (HeaderMap<String>, graphql::Response) {
let query = "{ topProducts { reviews { id } } }";
let request = graphql_request(query);
let mut request: http::Request<router::Body> = request.into();
request.headers_mut().insert(
HeaderName::from_static("apollo-cache-debugging"),
HeaderValue::from_static("true"),
);
make_http_request(router, request).await
}
async fn makegraphql_request_with_headers(
router: &mut HttpService,
headers: HeaderMap,
) -> (HeaderMap<String>, graphql::Response) {
let query = "{ topProducts { reviews { id } } }";
let request = graphql_request(query);
let mut request: http::Request<router::Body> = request.into();
for (key, value) in headers {
request.headers_mut().insert(key.unwrap(), value);
}
make_http_request(router, request).await
}
fn graphql_request(query: &str) -> router::Request {
supergraph::Request::fake_builder()
.query(query)
.build()
.unwrap()
.try_into()
.unwrap()
}
async fn make_json_request(
router: &mut HttpService,
request: http::Request<Value>,
) -> (HeaderMap<String>, Value) {
let request = request.map(|body| router::body::from_bytes(serde_json::to_vec(&body).unwrap()));
make_http_request(router, request).await
}
async fn make_http_request<ResponseBody>(
router: &mut HttpService,
request: http::Request<router::Body>,
) -> (HeaderMap<String>, ResponseBody)
where
ResponseBody: for<'a> serde::Deserialize<'a>,
{
let response = router.ready().await.unwrap().call(request).await.unwrap();
let headers = response
.headers()
.iter()
.map(|(k, v)| (k.clone(), v.to_str().unwrap().to_owned()))
.collect();
let body = response.into_body().collect().await.unwrap().to_bytes();
(headers, serde_json::from_slice(&body).unwrap())
}
#[tokio::test(flavor = "multi_thread")]
async fn dont_duplicate_redis_connections() {
if !graph_os_enabled() {
return;
}
let mut router = IntegrationTest::builder()
.config(serde_yaml::to_string(&config_with_subgraph_prometheus()).unwrap())
.build()
.await;
router.start().await;
router.assert_started().await;
router
.assert_metrics_contains(
r#"apollo_router_cache_redis_clients{otel_scope_name="apollo/router"} 3"#,
None,
)
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn basic_cache_skips_subgraph_request() {
if !graph_os_enabled() {
return;
}
let (mut router, subgraph_request_counters) = harness(base_config(), base_subgraphs()).await;
insta::assert_yaml_snapshot!(subgraph_request_counters, @r"
products: 0
reviews: 0
");
let (headers, body) = make_graphql_request(&mut router).await;
assert!(headers["cache-control"].contains("public"));
insta::assert_yaml_snapshot!(body, @r"
data:
topProducts:
- reviews:
- id: r1a
- id: r1b
- reviews:
- id: r2
");
insta::assert_yaml_snapshot!(subgraph_request_counters, @r"
products: 1
reviews: 1
");
tokio::time::sleep(Duration::from_millis(100)).await;
let (headers, body) = make_graphql_request(&mut router).await;
assert!(headers["cache-control"].contains("public"));
insta::assert_yaml_snapshot!(body, @r"
data:
topProducts:
- reviews:
- id: r1a
- id: r1b
- reviews:
- id: r2
");
insta::assert_yaml_snapshot!(subgraph_request_counters, @r"
products: 1
reviews: 1
");
}
#[tokio::test(flavor = "multi_thread")]
async fn private_id_set_at_subgraph_request() {
if !graph_os_enabled() {
return;
}
let (mut router, subgraph_request_counters) =
harness(base_config(), private_base_subgraphs()).await;
insta::assert_yaml_snapshot!(subgraph_request_counters, @r"
products: 0
reviews: 0
");
let (headers, body) = make_graphql_request(&mut router).await;
assert!(headers["cache-control"].contains("private"));
insta::assert_yaml_snapshot!(body, @r"
data:
topProducts:
- reviews:
- id: r1a
- id: r1b
- reviews:
- id: r2
");
insta::assert_yaml_snapshot!(subgraph_request_counters, @r"
products: 1
reviews: 1
");
let mut headers = HeaderMap::new();
headers.insert(
HeaderName::from_static("private_id"),
HeaderValue::from_static("123"),
);
let (resp_headers, body) = makegraphql_request_with_headers(&mut router, headers.clone()).await;
assert!(resp_headers["cache-control"].contains("private"));
insta::assert_yaml_snapshot!(body, @r"
data:
topProducts:
- reviews:
- id: r1a
- id: r1b
- reviews:
- id: r2
");
insta::assert_yaml_snapshot!(subgraph_request_counters, @r"
products: 2
reviews: 2
");
tokio::time::sleep(Duration::from_millis(100)).await;
let (headers, body) = makegraphql_request_with_headers(&mut router, headers.clone()).await;
assert!(headers["cache-control"].contains("private"));
insta::assert_yaml_snapshot!(body, @r"
data:
topProducts:
- reviews:
- id: r1a
- id: r1b
- reviews:
- id: r2
");
insta::assert_yaml_snapshot!(subgraph_request_counters, @r"
products: 2
reviews: 2
");
}
fn check_cache_tags(response: &graphql::Response, cache_tags: Vec<Vec<String>>) {
let mut debugger_entries = response
.extensions
.get("apolloCacheDebugging")
.unwrap()
.as_object()
.unwrap()
.get("data")
.unwrap()
.as_array()
.unwrap()
.iter();
for debug_cache_tags in cache_tags {
let entry = debugger_entries.next().unwrap().as_object().unwrap();
assert_eq!(
entry.get("invalidationKeys"),
Some(&serde_json_bytes::Value::Array(
debug_cache_tags
.into_iter()
.map(|cache_tag| serde_json_bytes::Value::String(cache_tag.into()))
.collect()
))
);
}
}
#[tokio::test(flavor = "multi_thread")]
async fn check_cache_tags_from_debugger_data() {
if !graph_os_enabled() {
return;
}
let mut config = base_config();
config
.get_mut("response_cache")
.and_then(|c| c.as_object_mut())
.and_then(|c| c.insert("debug".to_string(), true.into()));
let (mut router, subgraph_request_counters) = harness(config, base_subgraphs()).await;
insta::assert_yaml_snapshot!(subgraph_request_counters, @r"
products: 0
reviews: 0
");
let (headers, body) = make_debug_graphql_request(&mut router).await;
assert!(headers["cache-control"].contains("public"));
assert!(body.errors.is_empty());
insta::assert_yaml_snapshot!(body.data, @r"
topProducts:
- reviews:
- id: r1a
- id: r1b
- reviews:
- id: r2
");
check_cache_tags(
&body,
vec![
vec!["topProducts".to_string()],
vec!["product-1".to_string()],
vec!["product-2".to_string()],
],
);
insta::assert_yaml_snapshot!(subgraph_request_counters, @r"
products: 1
reviews: 1
");
tokio::time::sleep(Duration::from_millis(100)).await;
let (headers, body) = make_debug_graphql_request(&mut router).await;
assert!(headers["cache-control"].contains("public"));
assert!(body.errors.is_empty());
insta::assert_yaml_snapshot!(body.data, @r"
topProducts:
- reviews:
- id: r1a
- id: r1b
- reviews:
- id: r2
");
check_cache_tags(
&body,
vec![
vec!["topProducts".to_string()],
vec!["product-1".to_string()],
vec!["product-2".to_string()],
],
);
insta::assert_yaml_snapshot!(subgraph_request_counters, @r"
products: 1
reviews: 1
");
}
#[tokio::test(flavor = "multi_thread")]
async fn no_failure_when_storage_unavailable() {
if !graph_os_enabled() {
return;
}
let (mut router, subgraph_request_counters) = harness(failure_config(), base_subgraphs()).await;
insta::assert_yaml_snapshot!(subgraph_request_counters, @r"
products: 0
reviews: 0
");
let (headers, body) = make_graphql_request(&mut router).await;
assert!(headers["cache-control"].contains("public"));
insta::assert_yaml_snapshot!(body, @r"
data:
topProducts:
- reviews:
- id: r1a
- id: r1b
- reviews:
- id: r2
");
insta::assert_yaml_snapshot!(subgraph_request_counters, @r"
products: 1
reviews: 1
");
let (headers, body) = make_graphql_request(&mut router).await;
assert!(headers["cache-control"].contains("public"));
insta::assert_yaml_snapshot!(body, @r"
data:
topProducts:
- reviews:
- id: r1a
- id: r1b
- reviews:
- id: r2
");
insta::assert_yaml_snapshot!(subgraph_request_counters, @r"
products: 2
reviews: 2
");
}
#[tokio::test(flavor = "multi_thread")]
async fn not_cached_without_cache_control_header() {
if !graph_os_enabled() {
return;
}
let mut subgraphs = base_subgraphs();
subgraphs["products"]
.as_object_mut()
.unwrap()
.remove("headers");
subgraphs["reviews"]
.as_object_mut()
.unwrap()
.remove("headers");
let (mut router, subgraph_request_counters) = harness(base_config(), subgraphs).await;
insta::assert_yaml_snapshot!(subgraph_request_counters, @r"
products: 0
reviews: 0
");
let (headers, body) = make_graphql_request(&mut router).await;
assert_eq!(headers["cache-control"], "no-store");
insta::assert_yaml_snapshot!(body, @r"
data:
topProducts:
- reviews:
- id: r1a
- id: r1b
- reviews:
- id: r2
");
insta::assert_yaml_snapshot!(subgraph_request_counters, @r"
products: 1
reviews: 1
");
tokio::time::sleep(Duration::from_millis(100)).await;
let (headers, body) = make_graphql_request(&mut router).await;
assert_eq!(headers["cache-control"], "no-store");
insta::assert_yaml_snapshot!(body, @r"
data:
topProducts:
- reviews:
- id: r1a
- id: r1b
- reviews:
- id: r2
");
insta::assert_yaml_snapshot!(subgraph_request_counters, @r"
products: 2
reviews: 2
");
}
#[tokio::test(flavor = "multi_thread")]
async fn invalidate_with_endpoint_by_type() {
if !graph_os_enabled() {
return;
}
let (mut router, subgraph_request_counters) = harness(base_config(), base_subgraphs()).await;
let (headers, body) = make_graphql_request(&mut router).await;
assert!(headers["cache-control"].contains("public"));
assert!(body.errors.is_empty());
insta::assert_yaml_snapshot!(subgraph_request_counters, @r"
products: 1
reviews: 1
");
let request = http::Request::builder()
.method("POST")
.uri(INVALIDATION_PATH)
.header("Authorization", INVALIDATION_SHARED_KEY)
.body(json!([{
"kind": "type",
"subgraph": "reviews",
"type": "Product"
}]))
.unwrap();
for i in 0..10 {
let (_headers, body) = make_json_request(&mut router, request.clone()).await;
let expected_value = json!({"count": 2});
if body == expected_value {
break;
} else if i == 9 {
insta::assert_yaml_snapshot!(body, @"count: 2");
}
}
let (headers, body) = make_graphql_request(&mut router).await;
assert!(headers["cache-control"].contains("public"));
assert!(body.errors.is_empty());
insta::assert_yaml_snapshot!(subgraph_request_counters, @r"
products: 1
reviews: 2
");
}
#[tokio::test(flavor = "multi_thread")]
async fn invalidate_with_endpoint_by_entity_cache_tag() {
if !graph_os_enabled() {
return;
}
let (mut router, subgraph_request_counters) = harness(base_config(), base_subgraphs()).await;
let (headers, body) = make_graphql_request(&mut router).await;
assert!(headers["cache-control"].contains("public"));
assert!(body.errors.is_empty());
insta::assert_yaml_snapshot!(subgraph_request_counters, @r"
products: 1
reviews: 1
");
let request = http::Request::builder()
.method("POST")
.uri(INVALIDATION_PATH)
.header("Authorization", INVALIDATION_SHARED_KEY)
.body(json!([{
"kind": "cache_tag",
"subgraphs": ["reviews"],
"cache_tag": "product-1",
}]))
.unwrap();
for i in 0..10 {
let (_headers, body) = make_json_request(&mut router, request.clone()).await;
let expected_value = json!({"count": 1});
if body == expected_value {
break;
} else if i == 9 {
insta::assert_yaml_snapshot!(body, @"count: 1");
}
}
let (headers, body) = make_graphql_request(&mut router).await;
assert!(headers["cache-control"].contains("public"));
assert!(body.errors.is_empty());
insta::assert_yaml_snapshot!(subgraph_request_counters, @r"
products: 1
reviews: 2
");
}
#[tokio::test]
async fn cache_control_merging_single_fetch() {
if !graph_os_enabled() {
return;
}
let mut subgraphs = base_subgraphs();
subgraphs["products"]["headers"]["cache-control"] = "public, s-maxage=120".into();
subgraphs["reviews"]["headers"]["cache-control"] = "public, s-maxage=60".into();
let (mut router, _subgraph_request_counters) = harness(base_config(), subgraphs).await;
let query = "{ topProducts { upc } }";
let (headers, _body) =
make_http_request::<graphql::Response>(&mut router, graphql_request(query).into()).await;
insta::assert_snapshot!(&headers["cache-control"], @"max-age=120,public");
tokio::time::sleep(Duration::from_secs(2)).await;
let query = "{ topProducts { upc } }";
let (headers, _body) =
make_http_request::<graphql::Response>(&mut router, graphql_request(query).into()).await;
let cache_control = &headers["cache-control"];
let max_age = parse_max_age(cache_control);
assert!(max_age > 100 && max_age < 120, "got '{cache_control}'");
}
#[tokio::test]
async fn complex_entity_key_response_cache() {
if !graph_os_enabled() {
return;
}
let subgraphs = json!({
"stuff": {
"query": {
"getStatus": {
"id": "1",
"items": [{"id": "i1", "name": "Item"}],
"stuffDetails": "stuff we have"
}
}
},
"status": {
"entities": [{
"__typename": "Status",
"id": "1",
"items": [{"id": "i1", "name": "Item"}],
"statusDetails": "status details"
}]
}
});
let mut config = base_config();
{
let config_mut = config.as_object_mut().unwrap();
config_mut.insert("experimental_mock_subgraphs".into(), subgraphs);
config_mut
.get_mut("response_cache")
.unwrap()
.as_object_mut()
.unwrap()
.insert("debug".into(), true.into());
}
let router = apollo_router::TestHarness::builder()
.schema(include_str!("./fixtures/entity_key_complex.graphql"))
.configuration_json(config)
.unwrap()
.build_http_service()
.await
.unwrap();
let mut router = router;
let query = r#"{
getStatus(id: "1") {
id
items { id name }
stuffDetails
statusDetails
}
}"#;
let mut http_req: http::Request<RouterBody> = graphql_request(query).into();
http_req.headers_mut().insert(
HeaderName::from_static("apollo-cache-debugging"),
HeaderValue::from_static("true"),
);
let (_, body) = make_http_request::<graphql::Response>(&mut router, http_req).await;
assert!(body.errors.is_empty());
let expectation: serde_json_bytes::Value = json!({"getStatus":{"id":"1","items":[{"id":"i1","name":"Item"}],"stuffDetails":"stuff we have","statusDetails":"status details"}}).into();
assert_eq!(body.data, Some(expectation));
insta::assert_json_snapshot!(body.extensions, {
".apolloCacheDebugging.data[].key" => insta::dynamic_redaction(|value, _path| {
redact_cache_debug_query_hash(value.as_str().unwrap())
}),
".apolloCacheDebugging.data[].cacheControl.created" => 0
});
}
#[tokio::test]
async fn test_cache_keys_nullable_data() {
if !graph_os_enabled() {
return;
}
let subgraphs = json!({
"stuff": {
"query": {
"getStatus": {
"id": "1",
"items": [{"id": "i1", "name": null}],
"stuffDetails": "stuff we have"
}
}
},
"status": {
"entities": [{
"__typename": "Status",
"id": "1",
"items": [{"id": "i1", "name": null}],
"statusDetails": "status details"
}]
}
});
let mut config = base_config();
{
let config_mut = config.as_object_mut().unwrap();
config_mut.insert("experimental_mock_subgraphs".into(), subgraphs);
config_mut
.get_mut("response_cache")
.unwrap()
.as_object_mut()
.unwrap()
.insert("debug".into(), true.into());
}
let router = apollo_router::TestHarness::builder()
.schema(include_str!(
"./fixtures/entity_key_with_null_fields.graphql"
))
.configuration_json(config)
.unwrap()
.build_http_service()
.await
.unwrap();
let mut router = router;
let query = r#"{
getStatus(id: "1") {
id
items { id name }
stuffDetails
statusDetails
}
}"#;
let mut http_req: http::Request<RouterBody> = graphql_request(query).into();
http_req.headers_mut().insert(
HeaderName::from_static("apollo-cache-debugging"),
HeaderValue::from_static("true"),
);
let (_, body) = make_http_request::<graphql::Response>(&mut router, http_req).await;
assert!(body.errors.is_empty());
let expectation: serde_json_bytes::Value = json!({"getStatus":{"id":"1","items":[{"id":"i1","name": null}],"stuffDetails":"stuff we have","statusDetails":"status details"}}).into();
assert_eq!(body.data, Some(expectation));
insta::assert_json_snapshot!(body.extensions, {
".apolloCacheDebugging.data[].key" => insta::dynamic_redaction(|value, _path| {
redact_cache_debug_query_hash(value.as_str().unwrap())
}),
".apolloCacheDebugging.data[].cacheControl.created" => 0
});
}
#[tokio::test]
async fn cache_control_merging_multi_fetch() {
if !graph_os_enabled() {
return;
}
let mut subgraphs = base_subgraphs();
subgraphs["products"]["headers"]["cache-control"] = "public, s-maxage=120".into();
subgraphs["reviews"]["headers"]["cache-control"] = "public, s-maxage=60".into();
let (mut router, _subgraph_request_counters) = harness(base_config(), subgraphs).await;
let query = "{ topProducts { reviews { id } } }";
let (headers, _body) =
make_http_request::<graphql::Response>(&mut router, graphql_request(query).into()).await;
insta::assert_snapshot!(&headers["cache-control"], @"max-age=60,public");
tokio::time::sleep(Duration::from_secs(2)).await;
let (headers, _body) =
make_http_request::<graphql::Response>(&mut router, graphql_request(query).into()).await;
let cache_control = &headers["cache-control"];
let max_age = parse_max_age(cache_control);
assert!(max_age > 40 && max_age < 60, "got '{cache_control}'");
}
fn parse_max_age(cache_control: &str) -> u32 {
cache_control
.strip_prefix("max-age=")
.and_then(|s| s.strip_suffix(",public"))
.and_then(|s| s.parse().ok())
.unwrap_or_else(|| panic!("expected 'max-age={{seconds}},public', got '{cache_control}'"))
}
macro_rules! check_cache_key {
($namespace: expr, $cache_key: expr, $client: expr) => {
let key = format!("{}:{}", $namespace, $cache_key);
let mut record: Option<String> = None;
let mut interval = tokio::time::interval(Duration::from_secs(1));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
for _ in 0..10 {
interval.tick().await;
match $client
.get(key.clone())
.timeout(Duration::from_secs(5))
.await
{
Ok(Ok(resp)) => {
record = Some(resp);
break;
}
Err(_) => panic!("long timeout connecting to redis - did you call client.init()?"),
_ => {}
}
}
match record {
Some(s) => {
let cache_value: Value = serde_json::from_str(&s).unwrap();
let v: Value = cache_value.get("data").unwrap().clone();
insta::assert_json_snapshot!(v);
}
None => panic!("cache key not found after retries: {}", key),
}
};
}
macro_rules! assert_cache_key_exists {
($namespace: expr, $cache_key: expr, $client: expr) => {
let key = format!("{}:{}", $namespace, $cache_key);
let mut found = false;
let mut interval = tokio::time::interval(Duration::from_secs(1));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
for _ in 0..10 {
interval.tick().await;
match $client
.get::<Option<String>, _>(key.clone())
.timeout(Duration::from_secs(5))
.await
{
Ok(Ok(Some(_))) => {
found = true;
break;
}
Err(_) => panic!("long timeout connecting to redis - did you call client.init()?"),
_ => {}
}
}
if !found {
panic!("cache key not found after retries: {}", key);
}
};
}
async fn cache_key_exists(
namespace: &str,
cache_key: &str,
client: &Client,
) -> Result<bool, fred::error::Error> {
let key = format!("{namespace}:{cache_key}");
let count: u32 = client.exists(key).await?;
Ok(count == 1)
}
#[tokio::test(flavor = "multi_thread")]
async fn integration_test_basic() -> Result<(), BoxError> {
if !graph_os_enabled() {
return Ok(());
}
let namespace = namespace();
let client = redis_client().await?;
let subgraphs = json!({
"products": {
"query": {"topProducts": [{
"__typename": "Product",
"upc": "1",
"name": "chair"
},
{
"__typename": "Product",
"upc": "2",
"name": "table"
},
{
"__typename": "Product",
"upc": "3",
"name": "plate"
}]},
"headers": {"cache-control": "public"},
},
"reviews": {
"entities": [{
"__typename": "Product",
"upc": "1",
"reviews": [{
"__typename": "Review",
"body": "I can sit on it",
}]
},
{
"__typename": "Product",
"upc": "2",
"reviews": [{
"__typename": "Review",
"body": "I can sit on it",
}, {
"__typename": "Review",
"body": "I can sit on it2",
}]
},
{
"__typename": "Product",
"upc": "3",
"reviews": [{
"__typename": "Review",
"body": "I can sit on it",
}, {
"__typename": "Review",
"body": "I can sit on it2",
}, {
"__typename": "Review",
"body": "I can sit on it3",
}]
}],
"headers": {"cache-control": "public"},
}
});
let supergraph = apollo_router::TestHarness::builder()
.configuration_json(json!({
"response_cache": {
"enabled": true,
"debug": true,
"invalidation": {
"listen": "127.0.0.1:4000",
"path": "/invalidation"
},
"subgraph": {
"all": {
"enabled": true,
"redis": {
"urls": ["redis://127.0.0.1:6379"],
"namespace": namespace,
"pool_size": 3
},
},
"subgraphs": {
"products": {
"enabled": true,
"ttl": "60s"
},
"reviews": {
"enabled": true,
"ttl": "10s"
}
}
}
},
"include_subgraph_errors": {
"all": true
},
"experimental_mock_subgraphs": subgraphs.clone()
}))
.unwrap()
.schema(include_str!("../fixtures/supergraph-auth.graphql"))
.build_supergraph()
.await?;
let request = supergraph::Request::fake_builder()
.query(r#"{ topProducts { name reviews { body } } }"#)
.method(http::Method::POST)
.header("apollo-cache-debugging", "true")
.build()?;
let response = supergraph
.oneshot(request)
.await?
.next_response()
.await
.unwrap();
let cache_keys = extract_cache_keys_from_response(&response);
let cache_keys_by_subgraph = extract_cache_keys_by_subgraph(&response);
insta::assert_json_snapshot!(response, {
".extensions.apolloCacheDebugging.data[].key" => insta::dynamic_redaction(|value, _path| {
redact_cache_debug_query_hash(value.as_str().unwrap())
}),
".extensions.apolloCacheDebugging.data[].cacheControl.created" => 0
});
let products_cache_key = cache_keys_by_subgraph
.get("products")
.and_then(|v| v.first())
.unwrap();
let reviews_cache_keys = cache_keys_by_subgraph.get("reviews").unwrap();
check_cache_key!(&namespace, products_cache_key, &client);
check_cache_key!(&namespace, reviews_cache_keys.last().unwrap(), &client);
for cache_key in &cache_keys {
assert_cache_key_exists!(&namespace, cache_key, &client);
}
let supergraph = apollo_router::TestHarness::builder()
.configuration_json(json!({
"response_cache": {
"enabled": true,
"debug": true,
"invalidation": {
"listen": "127.0.0.1:4000",
"path": "/invalidation"
},
"subgraph": {
"all": {
"enabled": false,
"redis": {
"urls": ["redis://127.0.0.1:6379"],
"namespace": namespace,
},
},
"subgraphs": {
"products": {
"enabled": true,
"ttl": "60s"
},
"reviews": {
"enabled": true,
"ttl": "10s"
}
}
}
},
"include_subgraph_errors": {
"all": true
},
"experimental_mock_subgraphs": subgraphs.clone()
}))
.unwrap()
.schema(include_str!("../fixtures/supergraph-auth.graphql"))
.build_supergraph()
.await?;
let request = supergraph::Request::fake_builder()
.query(r#"{ topProducts(first: 2) { name reviews { body } } }"#)
.header("apollo-cache-debugging", "true")
.method(http::Method::POST)
.build()?;
let response = supergraph
.oneshot(request)
.await?
.next_response()
.await
.unwrap();
insta::assert_json_snapshot!(response, {
".extensions.apolloCacheDebugging.data[].key" => insta::dynamic_redaction(|value, _path| {
redact_cache_debug_query_hash(value.as_str().unwrap())
}),
".extensions.apolloCacheDebugging.data[].cacheControl.created" => 0
});
for review_key in reviews_cache_keys {
assert_cache_key_exists!(&namespace, review_key, &client);
}
const SECRET_SHARED_KEY: &str = "supersecret";
let http_service = apollo_router::TestHarness::builder()
.configuration_json(json!({
"response_cache": {
"enabled": true,
"invalidation": {
"listen": "127.0.0.1:4000",
"path": "/invalidation"
},
"subgraph": {
"all": {
"enabled": true,
"redis": {
"urls": ["redis://127.0.0.1:6379"],
"namespace": namespace,
},
"invalidation": {
"enabled": true,
"shared_key": SECRET_SHARED_KEY
}
},
"subgraphs": {
"products": {
"enabled": true,
"ttl": "60s",
"invalidation": {
"enabled": true,
"shared_key": SECRET_SHARED_KEY
}
},
"reviews": {
"enabled": true,
"ttl": "10s",
"invalidation": {
"enabled": true,
"shared_key": SECRET_SHARED_KEY
}
}
}
}
},
"include_subgraph_errors": {
"all": true
},
"experimental_mock_subgraphs": subgraphs.clone()
}))
.unwrap()
.schema(include_str!("../fixtures/supergraph-auth.graphql"))
.build_http_service()
.await?;
let request = http::Request::builder()
.uri("http://127.0.0.1:4000/invalidation")
.method(http::Method::POST)
.header(
http::header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
)
.header(
http::header::AUTHORIZATION,
HeaderValue::from_static(SECRET_SHARED_KEY),
)
.body(router::body::from_bytes(
serde_json::to_vec(&vec![json!({
"subgraph": "reviews",
"kind": "type",
"type": "Product"
})])
.unwrap(),
))
.unwrap();
let response = http_service.oneshot(request).await.unwrap();
let response_status = response.status();
let mut resp: Value = serde_json::from_str(
&router::body::into_string(response.into_body())
.await
.unwrap(),
)
.unwrap();
assert_eq!(
resp.as_object_mut()
.unwrap()
.get("count")
.unwrap()
.as_u64()
.unwrap(),
3u64
);
assert!(response_status.is_success());
for review_key in reviews_cache_keys {
assert!(!cache_key_exists(&namespace, review_key, &client).await?);
}
assert!(cache_key_exists(&namespace, products_cache_key, &client).await?);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn integration_test_with_nested_field_set() -> Result<(), BoxError> {
if !graph_os_enabled() {
return Ok(());
}
let namespace = namespace();
let schema = include_str!("../../src/testdata/supergraph_nested_fields.graphql");
let client = redis_client().await?;
let subgraphs = json!({
"products": {
"query": {"allProducts": [{
"id": "1",
"name": "Test",
"sku": "150",
"createdBy": { "__typename": "User", "email": "test@test.com", "country": {"a": "France"} }
}]},
"headers": {"cache-control": "public"},
},
"users": {
"entities": [{
"__typename": "User",
"email": "test@test.com",
"name": "test",
"country": {
"a": "France"
}
}],
"headers": {"cache-control": "public"},
}
});
let supergraph = apollo_router::TestHarness::builder()
.configuration_json(json!({
"response_cache": {
"enabled": true,
"debug": true,
"invalidation": {
"listen": "127.0.0.1:4000",
"path": "/invalidation"
},
"subgraph": {
"all": {
"enabled": true,
"redis": {
"urls": ["redis://127.0.0.1:6379"],
"namespace": namespace,
"pool_size": 3
},
}
}
},
"include_subgraph_errors": {
"all": true
},
"experimental_mock_subgraphs": subgraphs.clone()
}))
.unwrap()
.schema(schema)
.build_supergraph()
.await?;
let query = "query { allProducts { name createdBy { name country { a } } } }";
let request = supergraph::Request::fake_builder()
.query(query)
.header("apollo-cache-debugging", "true")
.method(http::Method::POST)
.build()?;
let response = supergraph
.oneshot(request)
.await?
.next_response()
.await
.unwrap();
let cache_keys = extract_cache_keys_from_response(&response);
let cache_keys_by_subgraph = extract_cache_keys_by_subgraph(&response);
insta::assert_json_snapshot!(response, {
".extensions.apolloCacheDebugging.data[].key" => insta::dynamic_redaction(|value, _path| {
redact_cache_debug_query_hash(value.as_str().unwrap())
}),
".extensions.apolloCacheDebugging.data[].cacheControl.created" => 0
});
let users_cache_keys = cache_keys_by_subgraph.get("users").unwrap();
let products_cache_key = cache_keys_by_subgraph
.get("products")
.and_then(|v| v.first())
.unwrap();
check_cache_key!(&namespace, products_cache_key, &client);
check_cache_key!(&namespace, users_cache_keys.first().unwrap(), &client);
for cache_key in &cache_keys {
assert_cache_key_exists!(&namespace, cache_key, &client);
}
let supergraph = apollo_router::TestHarness::builder()
.configuration_json(json!({
"response_cache": {
"enabled": true,
"debug": true,
"invalidation": {
"listen": "127.0.0.1:4000",
"path": "/invalidation"
},
"subgraph": {
"all": {
"enabled": false,
"redis": {
"urls": ["redis://127.0.0.1:6379"],
"namespace": namespace,
},
},
"subgraphs": {
"products": {
"enabled": true,
"ttl": "60s"
},
"reviews": {
"enabled": true,
"ttl": "10s"
}
}
}
},
"include_subgraph_errors": {
"all": true
},
"experimental_mock_subgraphs": subgraphs.clone()
}))
.unwrap()
.schema(schema)
.build_supergraph()
.await?;
let request = supergraph::Request::fake_builder()
.query(query)
.method(http::Method::POST)
.build()?;
let response = supergraph
.oneshot(request)
.await?
.next_response()
.await
.unwrap();
insta::assert_json_snapshot!(response, {
".extensions.apolloCacheDebugging.data[].key" => insta::dynamic_redaction(|value, _path| {
redact_cache_debug_query_hash(value.as_str().unwrap())
}),
".extensions.apolloCacheDebugging.data[].cacheControl.created" => 0
});
for user_key in users_cache_keys {
assert_cache_key_exists!(&namespace, user_key, &client);
}
const SECRET_SHARED_KEY: &str = "supersecret";
let http_service = apollo_router::TestHarness::builder()
.configuration_json(json!({
"response_cache": {
"enabled": true,
"debug": true,
"invalidation": {
"listen": "127.0.0.1:4000",
"path": "/invalidation"
},
"subgraph": {
"all": {
"enabled": true,
"redis": {
"urls": ["redis://127.0.0.1:6379"],
"namespace": namespace,
},
"invalidation": {
"enabled": true,
"shared_key": SECRET_SHARED_KEY
}
},
"subgraphs": {
"products": {
"enabled": true,
"ttl": "60s",
"invalidation": {
"enabled": true,
"shared_key": SECRET_SHARED_KEY
}
},
"reviews": {
"enabled": true,
"ttl": "10s",
"invalidation": {
"enabled": true,
"shared_key": SECRET_SHARED_KEY
}
}
}
}
},
"include_subgraph_errors": {
"all": true
},
"experimental_mock_subgraphs": subgraphs.clone()
}))
.unwrap()
.schema(schema)
.build_http_service()
.await?;
let request = http::Request::builder()
.uri("http://127.0.0.1:4000/invalidation")
.method(http::Method::POST)
.header(
http::header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
)
.header(
http::header::AUTHORIZATION,
HeaderValue::from_static(SECRET_SHARED_KEY),
)
.body(router::body::from_bytes(
serde_json::to_vec(&vec![json!({
"subgraph": "users",
"kind": "type",
"type": "User"
})])
.unwrap(),
))
.unwrap();
let response = http_service.oneshot(request).await.unwrap();
let response_status = response.status();
let mut resp: Value = serde_json::from_str(
&router::body::into_string(response.into_body())
.await
.unwrap(),
)
.unwrap();
assert_eq!(
resp.as_object_mut()
.unwrap()
.get("count")
.unwrap()
.as_u64()
.unwrap(),
1u64
);
assert!(response_status.is_success());
for user_key in users_cache_keys {
assert!(!cache_key_exists(&namespace, user_key, &client).await?);
}
let products_cache_key = cache_keys_by_subgraph
.get("products")
.and_then(|v| v.first())
.unwrap();
assert!(cache_key_exists(&namespace, products_cache_key, &client).await?);
Ok(())
}