use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use apollo_router::graphql;
use apollo_router::services;
use apollo_router::test_harness::HttpService;
use http::HeaderMap;
use http_body_util::BodyExt as _;
use indexmap::IndexMap;
use serde_json::json;
use tower::Service as _;
use tower::ServiceExt as _;
use crate::integration::common::IntegrationTest;
use crate::integration::common::Query;
use crate::integration::common::graph_os_enabled;
const INVALIDATION_PATH: &str = "/invalidation";
const INVALIDATION_SHARED_KEY: &str = "supersecret";
fn base_config() -> serde_json::Value {
let namespace = uuid::Uuid::new_v4().simple().to_string();
json!({
"include_subgraph_errors": {
"all": true,
},
"preview_entity_cache": {
"enabled": true,
"subgraph": {
"all": {
"redis": {
"urls": ["redis://127.0.0.1:6379"],
"ttl": "10m",
"namespace": namespace,
"required_to_start": true,
},
"invalidation": {
"enabled": true,
"shared_key": INVALIDATION_SHARED_KEY,
},
},
},
"invalidation": {
"listen": "127.0.0.1:4000",
"path": INVALIDATION_PATH,
},
},
})
}
fn base_subgraphs() -> serde_json::Value {
json!({
"products": {
"headers": {"cache-control": "public"},
"query": {
"topProducts": [
{"upc": "1"},
{"upc": "2"},
],
},
},
"reviews": {
"headers": {"cache-control": "public"},
"entities": [
{"__typename": "Product", "upc": "1", "reviews": [{"id": "r1a"}, {"id": "r1b"}]},
{"__typename": "Product", "upc": "2", "reviews": [{"id": "r2"}]},
],
},
})
}
async fn harness(
mut config: serde_json::Value,
subgraphs: serde_json::Value,
) -> (HttpService, Arc<IndexMap<String, Arc<AtomicUsize>>>) {
let counters = Arc::new(IndexMap::from([
("products".into(), Default::default()),
("reviews".into(), Default::default()),
]));
let counters2 = Arc::clone(&counters);
config
.as_object_mut()
.unwrap()
.insert("experimental_mock_subgraphs".into(), subgraphs);
let router = apollo_router::TestHarness::builder()
.schema(include_str!("../../testing_schema.graphql"))
.configuration_json(config)
.unwrap()
.subgraph_hook(move |subgraph_name, service| {
if let Some(counter) = counters2.get(subgraph_name) {
let counter = Arc::<AtomicUsize>::clone(counter);
service
.map_request(move |req| {
counter.fetch_add(1, Ordering::Relaxed);
req
})
.boxed()
} else {
service
}
})
.build_http_service()
.await
.unwrap();
(router, counters)
}
async fn make_graphql_request(router: &mut HttpService) -> (HeaderMap<String>, graphql::Response) {
let query = "{ topProducts { reviews { id } } }";
let request = graphql_request(query);
make_http_request(router, request.into()).await
}
fn graphql_request(query: &str) -> services::router::Request {
services::supergraph::Request::fake_builder()
.query(query)
.build()
.unwrap()
.try_into()
.unwrap()
}
async fn make_json_request(
router: &mut HttpService,
request: http::Request<serde_json::Value>,
) -> (HeaderMap<String>, serde_json::Value) {
let request =
request.map(|body| services::router::body::from_bytes(serde_json::to_vec(&body).unwrap()));
make_http_request(router, request).await
}
async fn make_http_request<ResponseBody>(
router: &mut HttpService,
request: http::Request<apollo_router::services::router::Body>,
) -> (HeaderMap<String>, ResponseBody)
where
ResponseBody: for<'a> serde::Deserialize<'a>,
{
let response = router.ready().await.unwrap().call(request).await.unwrap();
let headers = response
.headers()
.iter()
.map(|(k, v)| (k.clone(), v.to_str().unwrap().to_owned()))
.collect();
let body = response.into_body().collect().await.unwrap().to_bytes();
(headers, serde_json::from_slice(&body).unwrap())
}
#[tokio::test]
async fn basic_cache_skips_subgraph_request() {
if !graph_os_enabled() {
return;
}
let (mut router, subgraph_request_counters) = harness(base_config(), base_subgraphs()).await;
insta::assert_yaml_snapshot!(subgraph_request_counters, @r###"
products: 0
reviews: 0
"###);
let (headers, body) = make_graphql_request(&mut router).await;
assert!(headers["cache-control"].contains("public"));
insta::assert_yaml_snapshot!(body, @r###"
data:
topProducts:
- reviews:
- id: r1a
- id: r1b
- reviews:
- id: r2
"###);
insta::assert_yaml_snapshot!(subgraph_request_counters, @r###"
products: 1
reviews: 1
"###);
let (headers, body) = make_graphql_request(&mut router).await;
assert!(headers["cache-control"].contains("public"));
insta::assert_yaml_snapshot!(body, @r###"
data:
topProducts:
- reviews:
- id: r1a
- id: r1b
- reviews:
- id: r2
"###);
insta::assert_yaml_snapshot!(subgraph_request_counters, @r###"
products: 1
reviews: 1
"###);
}
#[tokio::test]
async fn not_cached_without_cache_control_header() {
if !graph_os_enabled() {
return;
}
let mut subgraphs = base_subgraphs();
subgraphs["products"]
.as_object_mut()
.unwrap()
.remove("headers");
subgraphs["reviews"]
.as_object_mut()
.unwrap()
.remove("headers");
let (mut router, subgraph_request_counters) = harness(base_config(), subgraphs).await;
insta::assert_yaml_snapshot!(subgraph_request_counters, @r###"
products: 0
reviews: 0
"###);
let (headers, body) = make_graphql_request(&mut router).await;
assert_eq!(headers["cache-control"], "no-store");
insta::assert_yaml_snapshot!(body, @r###"
data:
topProducts:
- reviews:
- id: r1a
- id: r1b
- reviews:
- id: r2
"###);
insta::assert_yaml_snapshot!(subgraph_request_counters, @r###"
products: 1
reviews: 1
"###);
let (headers, body) = make_graphql_request(&mut router).await;
assert_eq!(headers["cache-control"], "no-store");
insta::assert_yaml_snapshot!(body, @r###"
data:
topProducts:
- reviews:
- id: r1a
- id: r1b
- reviews:
- id: r2
"###);
insta::assert_yaml_snapshot!(subgraph_request_counters, @r###"
products: 2
reviews: 2
"###);
}
#[tokio::test]
async fn invalidate_with_endpoint() {
if !graph_os_enabled() {
return;
}
let (mut router, subgraph_request_counters) = harness(base_config(), base_subgraphs()).await;
let (headers, body) = make_graphql_request(&mut router).await;
assert!(headers["cache-control"].contains("public"));
assert!(body.errors.is_empty());
insta::assert_yaml_snapshot!(subgraph_request_counters, @r###"
products: 1
reviews: 1
"###);
let request = http::Request::builder()
.method("POST")
.uri(INVALIDATION_PATH)
.header("Authorization", INVALIDATION_SHARED_KEY)
.body(json!([{
"kind": "entity",
"subgraph": "reviews",
"type": "Product",
"key": {
"upc": "1",
},
}]))
.unwrap();
let (_headers, body) = make_json_request(&mut router, request).await;
insta::assert_yaml_snapshot!(body, @"count: 1");
let (headers, body) = make_graphql_request(&mut router).await;
assert!(headers["cache-control"].contains("public"));
assert!(body.errors.is_empty());
insta::assert_yaml_snapshot!(subgraph_request_counters, @r###"
products: 1
reviews: 2
"###);
}
#[tokio::test]
async fn cache_control_merging_single_fetch() {
if !graph_os_enabled() {
return;
}
let mut subgraphs = base_subgraphs();
subgraphs["products"]["headers"]["cache-control"] = "public, s-maxage=120".into();
subgraphs["reviews"]["headers"]["cache-control"] = "public, s-maxage=60".into();
let (mut router, _subgraph_request_counters) = harness(base_config(), subgraphs).await;
let query = "{ topProducts { upc } }";
let (headers, _body) =
make_http_request::<graphql::Response>(&mut router, graphql_request(query).into()).await;
insta::assert_snapshot!(&headers["cache-control"], @"max-age=120,public");
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let query = "{ topProducts { upc } }";
let (headers, _body) =
make_http_request::<graphql::Response>(&mut router, graphql_request(query).into()).await;
let cache_control = &headers["cache-control"];
let max_age = parse_max_age(cache_control);
assert!(max_age > 100 && max_age < 120, "got '{cache_control}'");
}
#[tokio::test]
async fn cache_control_merging_multi_fetch() {
if !graph_os_enabled() {
return;
}
let mut subgraphs = base_subgraphs();
subgraphs["products"]["headers"]["cache-control"] = "public, s-maxage=120".into();
subgraphs["reviews"]["headers"]["cache-control"] = "public, s-maxage=60".into();
let (mut router, _subgraph_request_counters) = harness(base_config(), subgraphs).await;
let query = "{ topProducts { reviews { id } } }";
let (headers, _body) =
make_http_request::<graphql::Response>(&mut router, graphql_request(query).into()).await;
insta::assert_snapshot!(&headers["cache-control"], @"max-age=60,public");
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let (headers, _body) =
make_http_request::<graphql::Response>(&mut router, graphql_request(query).into()).await;
let cache_control = &headers["cache-control"];
let max_age = parse_max_age(cache_control);
assert!(max_age > 40 && max_age < 60, "got '{cache_control}'");
}
fn parse_max_age(cache_control: &str) -> u32 {
cache_control
.strip_prefix("max-age=")
.and_then(|s| s.strip_suffix(",public"))
.and_then(|s| s.parse().ok())
.unwrap_or_else(|| panic!("expected 'max-age={{seconds}},public', got '{cache_control}'"))
}
fn subgraphs_with_many_entities(count: usize) -> serde_json::Value {
let mut reviews = vec![];
let mut top_products = vec![];
for upc in 1..=count {
top_products.push(json!({ "upc": upc.to_string() }));
reviews.push(json!({
"__typename": "Product",
"upc": upc.to_string(),
"reviews": [{ "id": format!("r{upc}") }],
}));
}
json!({
"products": {
"headers": {"cache-control": "public"},
"query": { "topProducts": top_products },
},
"reviews": {
"headers": {"cache-control": "public"},
"entities": reviews,
},
})
}
#[tokio::test(flavor = "multi_thread")]
async fn test_cache_metrics() {
if !graph_os_enabled() {
return;
}
const NUM_PRODUCTS: usize = 1_000;
let namespace = uuid::Uuid::new_v4().simple().to_string();
let config = json!({
"include_subgraph_errors": {
"all": true,
},
"preview_entity_cache": {
"enabled": true,
"subgraph": {
"all": {
"redis": {
"urls": ["redis://127.0.0.1:6379"],
"ttl": "10m",
"namespace": namespace,
"required_to_start": true,
"metrics_interval": "100ms",
},
"invalidation": {
"enabled": true,
"shared_key": INVALIDATION_SHARED_KEY,
},
},
},
"invalidation": {
"listen": "127.0.0.1:4000",
"path": INVALIDATION_PATH,
},
},
"telemetry": {
"exporters": {
"metrics": {
"prometheus": {
"enabled": true,
"listen": "127.0.0.1:0",
"path": "/metrics",
},
},
},
},
"experimental_mock_subgraphs": subgraphs_with_many_entities(NUM_PRODUCTS),
});
let mut router = IntegrationTest::builder()
.config(serde_yaml::to_string(&config).unwrap())
.build()
.await;
router.start().await;
router.assert_started().await;
let query = Query::builder()
.body(json!({"query":"{ topProducts { reviews { id } } }","variables":{}}))
.build();
let (_trace_id, response) = router.execute_query(query).await;
assert_eq!(response.status(), 200);
let body: serde_json::Value = response.json().await.unwrap();
assert_eq!(
body["data"]["topProducts"]
.as_array()
.expect("topProducts should be array")
.len(),
NUM_PRODUCTS
);
let query = Query::builder()
.body(json!({"query":"{ topProducts { reviews { id } } }","variables":{}}))
.build();
let (_trace_id, response) = router.execute_query(query).await;
assert_eq!(response.status(), 200);
let body: serde_json::Value = response.json().await.unwrap();
assert_eq!(
body["data"]["topProducts"]
.as_array()
.expect("topProducts should be array")
.len(),
NUM_PRODUCTS
);
for _ in 0..5 {
let query = Query::builder()
.body(json!({"query":"{ topProducts { reviews { id } } }","variables":{}}))
.build();
let (_trace_id, response) = router.execute_query(query).await;
assert_eq!(response.status(), 200);
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
router
.assert_metrics_contains(
r#"apollo_router_cache_redis_clients{otel_scope_name="apollo/router"} 1"#,
None,
)
.await;
router
.assert_metrics_contains(
r#"apollo_router_cache_redis_redelivery_count_total{kind="entity",otel_scope_name="apollo/router"} 0"#,
None,
)
.await;
router
.assert_metrics_contains(
r#"apollo_router_cache_redis_commands_executed_total{kind="entity",otel_scope_name="apollo/router"} 16"#,
None,
)
.await;
router
.assert_metrics_contains(
r#"apollo_router_cache_redis_command_queue_length{kind="entity",otel_scope_name="apollo/router"} 0"#,
None,
)
.await;
router.graceful_shutdown().await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_cache_error_metrics() {
if !graph_os_enabled() {
return;
}
let namespace = uuid::Uuid::new_v4().simple().to_string();
let config = json!({
"include_subgraph_errors": {
"all": true,
},
"preview_entity_cache": {
"enabled": true,
"subgraph": {
"all": {
"redis": {
"urls": ["redis://127.0.0.1:9999"], "ttl": "10m",
"namespace": namespace,
"required_to_start": false, "metrics_interval": "100ms",
},
},
},
},
"telemetry": {
"exporters": {
"metrics": {
"prometheus": {
"enabled": true,
"listen": "127.0.0.1:0",
"path": "/metrics",
},
},
},
},
"experimental_mock_subgraphs": subgraphs_with_many_entities(10),
});
let mut router = IntegrationTest::builder()
.config(serde_yaml::to_string(&config).unwrap())
.build()
.await;
router.start().await;
router.assert_started().await;
for _ in 0..3 {
let query = Query::builder()
.body(json!({"query":"{ topProducts { reviews { id } } }","variables":{}}))
.build();
let (_trace_id, response) = router.execute_query(query).await;
assert_eq!(response.status(), 200);
}
tokio::time::sleep(std::time::Duration::from_millis(3000)).await;
router
.assert_metrics_contains(
r#"apollo_router_cache_redis_errors_total{error_type="io",kind="entity",otel_scope_name="apollo/router"} 1"#,
None,
)
.await;
router.graceful_shutdown().await;
}
#[tokio::test]
async fn complex_entity_key_cache() {
if !graph_os_enabled() {
return;
}
let subgraphs = json!({
"stuff": {
"query": {
"getStatus": {
"id": "1",
"items": [{"id": "i1", "name": "Item"}],
"stuffDetails": "stuff we have"
}
}
},
"status": {
"entities": [{
"__typename": "Status",
"id": "1",
"items": [{"id": "i1", "name": "Item"}],
"statusDetails": "status details"
}]
}
});
let mut config = base_config();
config
.as_object_mut()
.unwrap()
.insert("experimental_mock_subgraphs".into(), subgraphs);
let router = apollo_router::TestHarness::builder()
.schema(include_str!("./fixtures/entity_key_complex.graphql"))
.configuration_json(config)
.unwrap()
.build_http_service()
.await
.unwrap();
let mut router = router;
let query = r#"{
getStatus(id: "1") {
id
items { id name }
stuffDetails
statusDetails
}
}"#;
let (_, body) =
make_http_request::<graphql::Response>(&mut router, graphql_request(query).into()).await;
assert!(body.errors.is_empty());
let expectation: serde_json_bytes::Value = json!({"getStatus":{"id":"1","items":[{"id":"i1","name":"Item"}],"stuffDetails":"stuff we have","statusDetails":"status details"}}).into();
assert_eq!(body.data, Some(expectation));
}