uhg_custom_appollo_roouter/services/layers/persisted_queries/
mod.rs

1mod id_extractor;
2mod manifest_poller;
3
4#[cfg(test)]
5use std::sync::Arc;
6
7use http::HeaderValue;
8use http::StatusCode;
9use http::header::CACHE_CONTROL;
10use id_extractor::PersistedQueryIdExtractor;
11pub use manifest_poller::FullPersistedQueryOperationId;
12pub use manifest_poller::PersistedQueryManifest;
13pub(crate) use manifest_poller::PersistedQueryManifestPoller;
14use tower::BoxError;
15
16use super::query_analysis::ParsedDocument;
17use crate::Configuration;
18use crate::graphql::Error as GraphQLError;
19use crate::plugins::telemetry::CLIENT_NAME;
20use crate::services::SupergraphRequest;
21use crate::services::SupergraphResponse;
22
23const DONT_CACHE_RESPONSE_VALUE: &str = "private, no-cache, must-revalidate";
24const PERSISTED_QUERIES_CLIENT_NAME_CONTEXT_KEY: &str = "apollo_persisted_queries::client_name";
25const PERSISTED_QUERIES_SAFELIST_SKIP_ENFORCEMENT_CONTEXT_KEY: &str =
26    "apollo_persisted_queries::safelist::skip_enforcement";
27
28struct UsedQueryIdFromManifest;
29
30#[derive(Debug)]
31pub(crate) struct PersistedQueryLayer {
32    /// Manages polling uplink for persisted queries and caches the current
33    /// value of the manifest and projected safelist. None if the layer is disabled.
34    pub(crate) manifest_poller: Option<PersistedQueryManifestPoller>,
35    introspection_enabled: bool,
36}
37
38fn skip_enforcement(request: &SupergraphRequest) -> bool {
39    request
40        .context
41        .get(PERSISTED_QUERIES_SAFELIST_SKIP_ENFORCEMENT_CONTEXT_KEY)
42        .unwrap_or_default()
43        .unwrap_or(false)
44}
45
46impl PersistedQueryLayer {
47    /// Create a new [`PersistedQueryLayer`] from CLI options, YAML configuration,
48    /// and optionally, an existing persisted query manifest poller.
49    pub(crate) async fn new(configuration: &Configuration) -> Result<Self, BoxError> {
50        if configuration.persisted_queries.enabled {
51            Ok(Self {
52                manifest_poller: Some(
53                    PersistedQueryManifestPoller::new(configuration.clone()).await?,
54                ),
55                introspection_enabled: configuration.supergraph.introspection,
56            })
57        } else {
58            Ok(Self {
59                manifest_poller: None,
60                introspection_enabled: configuration.supergraph.introspection,
61            })
62        }
63    }
64
65    /// Run a request through the layer.
66    /// Takes care of:
67    /// 1) resolving a persisted query ID to a query body
68    /// 2) matching a freeform GraphQL request against persisted queries, optionally rejecting it based on configuration
69    /// 3) continuing to the next stage of the router
70    pub(crate) fn supergraph_request(
71        &self,
72        request: SupergraphRequest,
73    ) -> Result<SupergraphRequest, SupergraphResponse> {
74        if let Some(manifest_poller) = &self.manifest_poller {
75            if let Some(persisted_query_id) = PersistedQueryIdExtractor::extract_id(&request) {
76                self.replace_query_id_with_operation_body(
77                    request,
78                    manifest_poller,
79                    &persisted_query_id,
80                )
81            } else if skip_enforcement(&request) {
82                // A plugin told us to allow this, so let's skip to require_id check.
83                Ok(request)
84            } else if let Some(log_unknown) = manifest_poller.never_allows_freeform_graphql() {
85                // If we don't have an ID and we require an ID, return an error immediately,
86                if log_unknown {
87                    if let Some(operation_body) = request.supergraph_request.body().query.as_ref() {
88                        // Note: it's kind of inconsistent that if we require
89                        // IDs and skip_enforcement is set, we don't call
90                        // log_unknown_operation on freeform GraphQL, but if we
91                        // *don't* require IDs and skip_enforcement is set, we
92                        // *do* call log_unknown_operation on unknown
93                        // operations.
94                        log_unknown_operation(operation_body, false);
95                    }
96                }
97                Err(supergraph_err_pq_id_required(request))
98            } else {
99                // Let the freeform document (or complete lack of a document) be
100                // parsed by the query analysis layer. We'll be back with
101                // supergraph_request_with_analyzed_query soon to apply our
102                // safelist, if any.
103                Ok(request)
104            }
105        } else {
106            // PQ layer is entirely disabled.
107            Ok(request)
108        }
109    }
110
111    /// Places an operation body on a [`SupergraphRequest`] if it has been persisted
112    pub(crate) fn replace_query_id_with_operation_body(
113        &self,
114        mut request: SupergraphRequest,
115        manifest_poller: &PersistedQueryManifestPoller,
116        persisted_query_id: &str,
117    ) -> Result<SupergraphRequest, SupergraphResponse> {
118        if request.supergraph_request.body().query.is_some() {
119            if manifest_poller.augmenting_apq_with_pre_registration_and_no_safelisting() {
120                // Providing both a query string and an ID is how the clients of
121                // the APQ feature (which is incompatible with safelisting and
122                // log_unknown) register an operation. We let the APQ layer
123                // handle this instead of handling it ourselves. Note that we
124                // still may end up checking it against the safelist for the
125                // purpose of log_unknown!
126                Ok(request)
127            } else {
128                Err(supergraph_err_cannot_send_id_and_body_with_apq_disabled(
129                    request,
130                ))
131            }
132        } else {
133            // if there is no query, look up the persisted query in the manifest
134            // and put the body on the `supergraph_request`
135            if let Some(persisted_query_body) = manifest_poller.get_operation_body(
136                persisted_query_id,
137                // Use the first one of these that exists:
138                // - The PQL-specific context name entry
139                //   `apollo_persisted_queries::client_name` (which can be set
140                //   by router_service plugins)
141                // - The same name used by telemetry (ie, the value of the
142                //   header named by `telemetry.apollo.client_name_header`,
143                //   which defaults to `apollographql-client-name` by default)
144                request
145                    .context
146                    .get(PERSISTED_QUERIES_CLIENT_NAME_CONTEXT_KEY)
147                    .unwrap_or_default()
148                    .or_else(|| request.context.get(CLIENT_NAME).unwrap_or_default()),
149            ) {
150                let body = request.supergraph_request.body_mut();
151                body.query = Some(persisted_query_body);
152                body.extensions.remove("persistedQuery");
153                // Record that we actually used our ID, so we can skip the
154                // safelist check later.
155                request
156                    .context
157                    .extensions()
158                    .with_lock(|mut lock| lock.insert(UsedQueryIdFromManifest));
159                u64_counter!(
160                    "apollo.router.operations.persisted_queries",
161                    "Total requests with persisted queries enabled",
162                    1
163                );
164                Ok(request)
165            } else if manifest_poller.augmenting_apq_with_pre_registration_and_no_safelisting() {
166                // The query ID isn't in our manifest, but we have APQ enabled
167                // (and no safelisting) so we just let APQ handle it instead of
168                // returning an error. (We still might check against the
169                // safelist later for log_unknown!)
170                Ok(request)
171            } else {
172                u64_counter!(
173                    "apollo.router.operations.persisted_queries",
174                    "Total requests with persisted queries enabled",
175                    1,
176                    persisted_queries.not_found = true
177                );
178                // if APQ is not enabled, return an error indicating the query was not found
179                Err(supergraph_err_operation_not_found(
180                    request,
181                    persisted_query_id,
182                ))
183            }
184        }
185    }
186
187    pub(crate) async fn supergraph_request_with_analyzed_query(
188        &self,
189        request: SupergraphRequest,
190    ) -> Result<SupergraphRequest, SupergraphResponse> {
191        let manifest_poller = match &self.manifest_poller {
192            // PQ feature entirely disabled; just pass through.
193            None => return Ok(request),
194            Some(mp) => mp,
195        };
196
197        let operation_body = match request.supergraph_request.body().query.as_ref() {
198            // if the request doesn't have a `query` document, continue with normal execution, which
199            // will result in the normal no-operation error.
200            None => return Ok(request),
201            Some(ob) => ob,
202        };
203
204        let doc = {
205            if request
206                .context
207                .extensions()
208                .with_lock(|lock| lock.get::<UsedQueryIdFromManifest>().is_some())
209            {
210                return Ok(request);
211            }
212
213            let doc_opt = request
214                .context
215                .extensions()
216                .with_lock(|lock| lock.get::<ParsedDocument>().cloned());
217
218            match doc_opt {
219                None => {
220                    // For some reason, QueryAnalysisLayer didn't give us a document?
221                    return Err(supergraph_err(
222                        graphql_err(
223                            "MISSING_PARSED_OPERATION",
224                            "internal error: executable document missing",
225                        ),
226                        request,
227                        ErrorCacheStrategy::DontCache,
228                        StatusCode::INTERNAL_SERVER_ERROR,
229                    ));
230                }
231                Some(d) => d,
232            }
233        };
234
235        // If introspection is enabled in this server, all introspection
236        // requests are always allowed. (This means any document all of whose
237        // top-level fields in all operations (after spreading fragments) are
238        // __type/__schema/__typename.) We do want to make sure the document
239        // parsed properly before poking around at it, though.
240        if self.introspection_enabled
241            && doc
242                .executable
243                .operations
244                .iter()
245                .all(|op| op.is_introspection(&doc.executable))
246        {
247            return Ok(request);
248        }
249
250        let mut metric_attributes = vec![];
251        let freeform_graphql_action = manifest_poller.action_for_freeform_graphql(Ok(&doc.ast));
252        let skip_enforcement = skip_enforcement(&request);
253        let allow = skip_enforcement || freeform_graphql_action.should_allow;
254        if !allow {
255            metric_attributes.push(opentelemetry::KeyValue::new(
256                "persisted_queries.safelist.rejected.unknown".to_string(),
257                true,
258            ));
259        } else if !freeform_graphql_action.should_allow {
260            metric_attributes.push(opentelemetry::KeyValue::new(
261                "persisted_queries.safelist.enforcement_skipped".to_string(),
262                true,
263            ));
264        }
265        if freeform_graphql_action.should_log {
266            log_unknown_operation(operation_body, skip_enforcement);
267            metric_attributes.push(opentelemetry::KeyValue::new(
268                "persisted_queries.logged".to_string(),
269                true,
270            ));
271        }
272        u64_counter!(
273            "apollo.router.operations.persisted_queries",
274            "Total requests with persisted queries enabled",
275            1,
276            metric_attributes
277        );
278
279        if allow {
280            Ok(request)
281        } else {
282            Err(supergraph_err_operation_not_in_safelist(request))
283        }
284    }
285
286    pub(crate) fn all_operations(&self) -> Option<Vec<String>> {
287        self.manifest_poller
288            .as_ref()
289            .map(|poller| poller.get_all_operations())
290    }
291}
292
293fn log_unknown_operation(operation_body: &str, enforcement_skipped: bool) {
294    tracing::warn!(
295        message = "unknown operation",
296        operation_body,
297        enforcement_skipped
298    );
299}
300
301#[derive(Debug, Clone, Eq, PartialEq)]
302enum ErrorCacheStrategy {
303    Cache,
304    DontCache,
305}
306
307impl ErrorCacheStrategy {
308    fn get_supergraph_response(
309        &self,
310        graphql_error: GraphQLError,
311        request: SupergraphRequest,
312        status_code: StatusCode,
313    ) -> SupergraphResponse {
314        let mut error_builder = SupergraphResponse::error_builder()
315            .error(graphql_error)
316            .status_code(status_code)
317            .context(request.context);
318
319        if matches!(self, Self::DontCache) {
320            // Persisted query errors (especially "not registered") need to be uncached, because
321            // if we accidentally end up in a state where clients are "ahead" of Routers,
322            // we don't want them to get "stuck" believing we don't know the PQ if we
323            // catch up afterwards.
324            error_builder = error_builder.header(
325                CACHE_CONTROL,
326                HeaderValue::from_static(DONT_CACHE_RESPONSE_VALUE),
327            );
328        }
329
330        error_builder.build().expect("response is valid")
331    }
332}
333
334fn graphql_err_operation_not_found(persisted_query_id: &str) -> GraphQLError {
335    graphql_err(
336        "PERSISTED_QUERY_NOT_IN_LIST",
337        &format!("Persisted query '{persisted_query_id}' not found in the persisted query list"),
338    )
339}
340
341fn supergraph_err_operation_not_found(
342    request: SupergraphRequest,
343    persisted_query_id: &str,
344) -> SupergraphResponse {
345    supergraph_err(
346        graphql_err_operation_not_found(persisted_query_id),
347        request,
348        ErrorCacheStrategy::DontCache,
349        StatusCode::NOT_FOUND,
350    )
351}
352
353fn graphql_err_cannot_send_id_and_body() -> GraphQLError {
354    graphql_err(
355        "CANNOT_SEND_PQ_ID_AND_BODY",
356        "Sending a persisted query ID and a body in the same request is disallowed",
357    )
358}
359
360fn supergraph_err_cannot_send_id_and_body_with_apq_disabled(
361    request: SupergraphRequest,
362) -> SupergraphResponse {
363    supergraph_err(
364        graphql_err_cannot_send_id_and_body(),
365        request,
366        ErrorCacheStrategy::DontCache,
367        StatusCode::BAD_REQUEST,
368    )
369}
370
371fn graphql_err_operation_not_in_safelist() -> GraphQLError {
372    graphql_err(
373        "QUERY_NOT_IN_SAFELIST",
374        "The operation body was not found in the persisted query safelist",
375    )
376}
377
378fn supergraph_err_operation_not_in_safelist(request: SupergraphRequest) -> SupergraphResponse {
379    supergraph_err(
380        graphql_err_operation_not_in_safelist(),
381        request,
382        ErrorCacheStrategy::DontCache,
383        StatusCode::FORBIDDEN,
384    )
385}
386
387fn graphql_err_pq_id_required() -> GraphQLError {
388    graphql_err(
389        "PERSISTED_QUERY_ID_REQUIRED",
390        "This endpoint does not allow freeform GraphQL requests; operations must be sent by ID in the persisted queries GraphQL extension.",
391    )
392}
393
394fn supergraph_err_pq_id_required(request: SupergraphRequest) -> SupergraphResponse {
395    supergraph_err(
396        graphql_err_pq_id_required(),
397        request,
398        ErrorCacheStrategy::Cache,
399        StatusCode::BAD_REQUEST,
400    )
401}
402
403fn graphql_err(code: &str, message: &str) -> GraphQLError {
404    GraphQLError::builder()
405        .extension_code(code)
406        .message(message)
407        .build()
408}
409
410fn supergraph_err(
411    graphql_error: GraphQLError,
412    request: SupergraphRequest,
413    cache_strategy: ErrorCacheStrategy,
414    status_code: StatusCode,
415) -> SupergraphResponse {
416    cache_strategy.get_supergraph_response(graphql_error, request, status_code)
417}
418
419#[cfg(test)]
420mod tests {
421    use std::collections::HashMap;
422    use std::time::Duration;
423
424    use maplit::hashmap;
425    use serde_json::json;
426    use tracing::instrument::WithSubscriber;
427
428    use super::*;
429    use crate::Context;
430    use crate::assert_snapshot_subscriber;
431    use crate::configuration::Apq;
432    use crate::configuration::PersistedQueries;
433    use crate::configuration::PersistedQueriesSafelist;
434    use crate::configuration::Supergraph;
435    use crate::metrics::FutureMetricsExt;
436    use crate::services::layers::persisted_queries::manifest_poller::FreeformGraphQLBehavior;
437    use crate::services::layers::query_analysis::QueryAnalysisLayer;
438    use crate::spec::Schema;
439    use crate::test_harness::mocks::persisted_queries::*;
440
441    #[tokio::test(flavor = "multi_thread")]
442    async fn disabled_pq_layer_has_no_poller() {
443        let (_mock_guard, uplink_config) = mock_empty_pq_uplink().await;
444        let pq_layer = PersistedQueryLayer::new(
445            &Configuration::fake_builder()
446                .persisted_query(PersistedQueries::builder().enabled(false).build())
447                .uplink(uplink_config)
448                .build()
449                .unwrap(),
450        )
451        .await
452        .unwrap();
453        assert!(pq_layer.manifest_poller.is_none());
454    }
455
456    #[tokio::test(flavor = "multi_thread")]
457    async fn enabled_pq_layer_has_poller() {
458        let (_mock_guard, uplink_config) = mock_empty_pq_uplink().await;
459        let pq_layer = PersistedQueryLayer::new(
460            &Configuration::fake_builder()
461                .persisted_query(PersistedQueries::builder().enabled(true).build())
462                .uplink(uplink_config)
463                .build()
464                .unwrap(),
465        )
466        .await
467        .unwrap();
468        assert!(pq_layer.manifest_poller.is_some())
469    }
470
471    #[tokio::test]
472    async fn poller_waits_to_start() {
473        let (_id, _body, manifest) = fake_manifest();
474        let delay = Duration::from_secs(2);
475        let (_mock_guard, uplink_config) = mock_pq_uplink_with_delay(&manifest, delay).await;
476        let now = tokio::time::Instant::now();
477
478        assert!(
479            PersistedQueryManifestPoller::new(
480                Configuration::fake_builder()
481                    .uplink(uplink_config)
482                    .build()
483                    .unwrap(),
484            )
485            .await
486            .is_ok()
487        );
488
489        assert!(now.elapsed() >= delay);
490    }
491
492    #[tokio::test(flavor = "multi_thread")]
493    async fn enabled_pq_layer_can_run_pq() {
494        let (id, body, manifest) = fake_manifest();
495
496        let (_mock_guard, uplink_config) = mock_pq_uplink(&manifest).await;
497
498        let pq_layer = PersistedQueryLayer::new(
499            &Configuration::fake_builder()
500                .persisted_query(PersistedQueries::builder().enabled(true).build())
501                .uplink(uplink_config)
502                .build()
503                .unwrap(),
504        )
505        .await
506        .unwrap();
507        let incoming_request = SupergraphRequest::fake_builder()
508            .extension("persistedQuery", json!({"version": 1, "sha256Hash": id}))
509            .build()
510            .unwrap();
511
512        assert!(incoming_request.supergraph_request.body().query.is_none());
513
514        let result = pq_layer.supergraph_request(incoming_request);
515        let request = result
516            .ok()
517            .expect("pq layer returned response instead of putting the query on the request");
518        assert_eq!(request.supergraph_request.body().query, Some(body));
519    }
520
521    #[tokio::test(flavor = "multi_thread")]
522    async fn enabled_pq_layer_with_client_names() {
523        let (_mock_guard, uplink_config) = mock_pq_uplink(&hashmap! {
524            FullPersistedQueryOperationId {
525                operation_id: "both-plain-and-cliented".to_string(),
526                client_name: None,
527            } => "query { bpac_no_client: __typename }".to_string(),
528            FullPersistedQueryOperationId {
529                operation_id: "both-plain-and-cliented".to_string(),
530                client_name: Some("web".to_string()),
531            } => "query { bpac_web_client: __typename }".to_string(),
532            FullPersistedQueryOperationId {
533                operation_id: "only-cliented".to_string(),
534                client_name: Some("web".to_string()),
535            } => "query { oc_web_client: __typename }".to_string(),
536        })
537        .await;
538
539        let pq_layer = PersistedQueryLayer::new(
540            &Configuration::fake_builder()
541                .persisted_query(PersistedQueries::builder().enabled(true).build())
542                .uplink(uplink_config)
543                .build()
544                .unwrap(),
545        )
546        .await
547        .unwrap();
548
549        let map_to_query = |operation_id: &str, client_name: Option<&str>| -> Option<String> {
550            let context = Context::new();
551            if let Some(client_name) = client_name {
552                context
553                    .insert(
554                        PERSISTED_QUERIES_CLIENT_NAME_CONTEXT_KEY,
555                        client_name.to_string(),
556                    )
557                    .unwrap();
558            }
559
560            let incoming_request = SupergraphRequest::fake_builder()
561                .extension(
562                    "persistedQuery",
563                    json!({"version": 1, "sha256Hash": operation_id.to_string()}),
564                )
565                .context(context)
566                .build()
567                .unwrap();
568
569            pq_layer
570                .supergraph_request(incoming_request)
571                .ok()
572                .expect("pq layer returned response instead of putting the query on the request")
573                .supergraph_request
574                .body()
575                .query
576                .clone()
577        };
578
579        assert_eq!(
580            map_to_query("both-plain-and-cliented", None),
581            Some("query { bpac_no_client: __typename }".to_string())
582        );
583        assert_eq!(
584            map_to_query("both-plain-and-cliented", Some("not-web")),
585            Some("query { bpac_no_client: __typename }".to_string())
586        );
587        assert_eq!(
588            map_to_query("both-plain-and-cliented", Some("web")),
589            Some("query { bpac_web_client: __typename }".to_string())
590        );
591        assert_eq!(
592            map_to_query("only-cliented", Some("web")),
593            Some("query { oc_web_client: __typename }".to_string())
594        );
595        assert_eq!(map_to_query("only-cliented", None), None);
596        assert_eq!(map_to_query("only-cliented", Some("not-web")), None);
597    }
598
599    #[tokio::test(flavor = "multi_thread")]
600    async fn pq_layer_passes_on_to_apq_layer_when_id_not_found() {
601        let (_id, _body, manifest) = fake_manifest();
602
603        let (_mock_guard, uplink_config) = mock_pq_uplink(&manifest).await;
604
605        let pq_layer = PersistedQueryLayer::new(
606            &Configuration::fake_builder()
607                .persisted_query(PersistedQueries::builder().enabled(true).build())
608                .apq(Apq::fake_builder().enabled(true).build())
609                .uplink(uplink_config)
610                .build()
611                .unwrap(),
612        )
613        .await
614        .unwrap();
615        let incoming_request = SupergraphRequest::fake_builder()
616            .extension(
617                "persistedQuery",
618                json!({"version": 1, "sha256Hash": "this-id-is-invalid"}),
619            )
620            .build()
621            .unwrap();
622
623        assert!(incoming_request.supergraph_request.body().query.is_none());
624
625        let result = pq_layer.supergraph_request(incoming_request);
626        let request = result
627            .ok()
628            .expect("pq layer returned response instead of continuing to APQ layer");
629        assert!(request.supergraph_request.body().query.is_none());
630    }
631
632    #[tokio::test(flavor = "multi_thread")]
633    async fn pq_layer_errors_when_id_not_found_and_apq_disabled() {
634        let (_id, _body, manifest) = fake_manifest();
635
636        let (_mock_guard, uplink_config) = mock_pq_uplink(&manifest).await;
637
638        let pq_layer = PersistedQueryLayer::new(
639            &Configuration::fake_builder()
640                .persisted_query(PersistedQueries::builder().enabled(true).build())
641                .apq(Apq::fake_builder().enabled(false).build())
642                .uplink(uplink_config)
643                .build()
644                .unwrap(),
645        )
646        .await
647        .unwrap();
648        let invalid_id = "this-id-is-invalid";
649        let incoming_request = SupergraphRequest::fake_builder()
650            .extension(
651                "persistedQuery",
652                json!({"version": 1, "sha256Hash": invalid_id}),
653            )
654            .build()
655            .unwrap();
656
657        assert!(incoming_request.supergraph_request.body().query.is_none());
658
659        let mut supergraph_response = pq_layer
660            .supergraph_request(incoming_request)
661            .expect_err("pq layer returned request instead of returning an error response");
662        assert_eq!(supergraph_response.response.status(), 404);
663        let response = supergraph_response
664            .next_response()
665            .await
666            .expect("could not get response from pq layer");
667        assert_eq!(
668            response.errors,
669            vec![graphql_err_operation_not_found(invalid_id)]
670        );
671    }
672
673    #[tokio::test(flavor = "multi_thread")]
674    async fn enabled_apq_configuration_tracked_in_pq_layer() {
675        let (_mock_guard, uplink_config) = mock_empty_pq_uplink().await;
676        let pq_layer = PersistedQueryLayer::new(
677            &Configuration::fake_builder()
678                .apq(Apq::fake_builder().enabled(true).build())
679                .persisted_query(PersistedQueries::builder().enabled(true).build())
680                .uplink(uplink_config)
681                .build()
682                .unwrap(),
683        )
684        .await
685        .unwrap();
686        assert!(
687            pq_layer
688                .manifest_poller
689                .unwrap()
690                .augmenting_apq_with_pre_registration_and_no_safelisting()
691        )
692    }
693
694    #[tokio::test(flavor = "multi_thread")]
695    async fn disabled_apq_configuration_tracked_in_pq_layer() {
696        let (_mock_guard, uplink_config) = mock_empty_pq_uplink().await;
697        let pq_layer = PersistedQueryLayer::new(
698            &Configuration::fake_builder()
699                .apq(Apq::fake_builder().enabled(false).build())
700                .uplink(uplink_config)
701                .persisted_query(PersistedQueries::builder().enabled(true).build())
702                .build()
703                .unwrap(),
704        )
705        .await
706        .unwrap();
707        assert!(
708            !pq_layer
709                .manifest_poller
710                .unwrap()
711                .augmenting_apq_with_pre_registration_and_no_safelisting()
712        )
713    }
714
715    #[tokio::test(flavor = "multi_thread")]
716    async fn enabled_safelist_configuration_tracked_in_pq_layer() {
717        let safelist_config = PersistedQueriesSafelist::builder().enabled(true).build();
718        let (_mock_guard, uplink_config) = mock_empty_pq_uplink().await;
719        let pq_layer = PersistedQueryLayer::new(
720            &Configuration::fake_builder()
721                .persisted_query(
722                    PersistedQueries::builder()
723                        .enabled(true)
724                        .safelist(safelist_config)
725                        .build(),
726                )
727                .uplink(uplink_config)
728                .apq(Apq::fake_builder().enabled(false).build())
729                .build()
730                .unwrap(),
731        )
732        .await
733        .unwrap();
734        assert!(matches!(
735            pq_layer
736                .manifest_poller
737                .unwrap()
738                .state
739                .read()
740                .unwrap()
741                .freeform_graphql_behavior,
742            FreeformGraphQLBehavior::AllowIfInSafelist { .. }
743        ))
744    }
745
746    async fn run_first_two_layers(
747        pq_layer: &PersistedQueryLayer,
748        query_analysis_layer: &QueryAnalysisLayer,
749        body: &str,
750        skip_enforcement: bool,
751    ) -> SupergraphRequest {
752        let context = Context::new();
753        if skip_enforcement {
754            context
755                .insert(
756                    PERSISTED_QUERIES_SAFELIST_SKIP_ENFORCEMENT_CONTEXT_KEY,
757                    true,
758                )
759                .unwrap();
760        }
761
762        let incoming_request = SupergraphRequest::fake_builder()
763            .query(body)
764            .context(context)
765            .build()
766            .unwrap();
767
768        assert!(incoming_request.supergraph_request.body().query.is_some());
769
770        // The initial hook won't block us --- that waits until after we've parsed
771        // the operation.
772        let updated_request = pq_layer
773            .supergraph_request(incoming_request)
774            .ok()
775            .expect("pq layer returned error response instead of returning a request");
776        query_analysis_layer
777            .supergraph_request(updated_request)
778            .await
779            .ok()
780            .expect("QA layer returned error response instead of returning a request")
781    }
782
783    async fn denied_by_safelist(
784        pq_layer: &PersistedQueryLayer,
785        query_analysis_layer: &QueryAnalysisLayer,
786        body: &str,
787        log_unknown: bool,
788        counter_value: u64,
789    ) {
790        let request_with_analyzed_query =
791            run_first_two_layers(pq_layer, query_analysis_layer, body, false).await;
792
793        let mut supergraph_response = pq_layer
794            .supergraph_request_with_analyzed_query(request_with_analyzed_query)
795            .await
796            .expect_err(
797                "pq layer second hook returned request instead of returning an error response",
798            );
799        assert_eq!(supergraph_response.response.status(), 403);
800        let response = supergraph_response
801            .next_response()
802            .await
803            .expect("could not get response from pq layer");
804        assert_eq!(
805            response.errors,
806            vec![graphql_err_operation_not_in_safelist()]
807        );
808        let mut metric_attributes = vec![opentelemetry::KeyValue::new(
809            "persisted_queries.safelist.rejected.unknown".to_string(),
810            true,
811        )];
812        if log_unknown {
813            metric_attributes.push(opentelemetry::KeyValue::new(
814                "persisted_queries.logged".to_string(),
815                true,
816            ));
817        }
818        assert_counter!(
819            "apollo.router.operations.persisted_queries",
820            counter_value,
821            &metric_attributes
822        );
823    }
824
825    async fn allowed_by_safelist(
826        pq_layer: &PersistedQueryLayer,
827        query_analysis_layer: &QueryAnalysisLayer,
828        body: &str,
829        log_unknown: bool,
830        skip_enforcement: bool,
831        counter_value: u64,
832    ) {
833        let request_with_analyzed_query =
834            run_first_two_layers(pq_layer, query_analysis_layer, body, skip_enforcement).await;
835
836        pq_layer
837            .supergraph_request_with_analyzed_query(request_with_analyzed_query)
838            .await
839            .ok()
840            .expect("pq layer second hook returned error response instead of returning a request");
841
842        let mut metric_attributes = vec![];
843        if skip_enforcement {
844            metric_attributes.push(opentelemetry::KeyValue::new(
845                "persisted_queries.safelist.enforcement_skipped".to_string(),
846                true,
847            ));
848            if log_unknown {
849                metric_attributes.push(opentelemetry::KeyValue::new(
850                    "persisted_queries.logged".to_string(),
851                    true,
852                ));
853            }
854        }
855
856        assert_counter!(
857            "apollo.router.operations.persisted_queries",
858            counter_value,
859            &metric_attributes
860        );
861    }
862
863    async fn pq_layer_freeform_graphql_with_safelist(log_unknown: bool) {
864        async move {
865            let manifest = HashMap::from([
866                (
867                    FullPersistedQueryOperationId {
868                        operation_id: "valid-syntax".to_string(),
869                        client_name: None,
870                    },
871                    "fragment A on Query { me { id } }    query SomeOp { ...A ...B }    fragment,,, B on Query{me{name,username}  } # yeah"
872                        .to_string(),
873                ),
874                (
875                    FullPersistedQueryOperationId {
876                        operation_id: "invalid-syntax".to_string(),
877                        client_name: None,
878                    },
879                    "}}}".to_string(),
880                ),
881            ]);
882
883            let (_mock_guard, uplink_config) = mock_pq_uplink(&manifest).await;
884
885            let config = Configuration::fake_builder()
886                .persisted_query(
887                    PersistedQueries::builder()
888                        .enabled(true)
889                        .safelist(PersistedQueriesSafelist::builder().enabled(true).build())
890                        .log_unknown(log_unknown)
891                        .build(),
892                )
893                .uplink(uplink_config)
894                .apq(Apq::fake_builder().enabled(false).build())
895                .supergraph(Supergraph::fake_builder().introspection(true).build())
896                .build()
897                .unwrap();
898
899            let pq_layer = PersistedQueryLayer::new(&config).await.unwrap();
900
901            let schema = Arc::new(Schema::parse(include_str!("../../../testdata/supergraph.graphql"), &Default::default()).unwrap());
902
903            let query_analysis_layer = QueryAnalysisLayer::new(schema, Arc::new(config)).await;
904
905            // A random query is blocked.
906            denied_by_safelist(
907                &pq_layer,
908                &query_analysis_layer,
909                "query SomeQuery { me { id } }",
910                log_unknown,
911                1,
912            ).await;
913
914            // But it is allowed with skip_enforcement set.
915            allowed_by_safelist(
916                &pq_layer,
917                &query_analysis_layer,
918                "query SomeQuery { me { id } }",
919                log_unknown,
920                true,
921                1,
922            ).await;
923
924            // The exact string from the manifest is allowed.
925            allowed_by_safelist(
926                &pq_layer,
927                &query_analysis_layer,
928                "fragment A on Query { me { id } }    query SomeOp { ...A ...B }    fragment,,, B on Query{me{name,username}  } # yeah",
929                log_unknown,
930                false,
931                1,
932            )
933            .await;
934
935            // Reordering definitions and reformatting a bit matches.
936            allowed_by_safelist(
937                &pq_layer,
938                &query_analysis_layer,
939                "#comment\n  fragment, B on Query  , { me{name    username} }    query SomeOp {  ...A ...B }  fragment    \nA on Query { me{ id} }",
940                log_unknown,
941                false,
942                2,
943            )
944            .await;
945
946            // Reordering fields does not match!
947            denied_by_safelist(
948                &pq_layer,
949                &query_analysis_layer,
950                "fragment A on Query { me { id } }    query SomeOp { ...A ...B }    fragment,,, B on Query{me{username,name}  } # yeah",
951                log_unknown,
952                2,
953            )
954            .await;
955
956            // Introspection queries are allowed (even using fragments and aliases), because
957            // introspection is enabled.
958            allowed_by_safelist(
959                &pq_layer,
960                &query_analysis_layer,
961                r#"fragment F on Query { __typename foo: __schema { __typename } } query Q { __type(name: "foo") { name } ...F }"#,
962                log_unknown,
963                false,
964                // Note that introspection queries don't actually interact with the PQ machinery enough
965                // to update this metric, for better or for worse.
966                2,
967            )
968            .await;
969
970            // Multiple spreads of the same fragment are also allowed
971            // (https://github.com/apollographql/apollo-rs/issues/613)
972            allowed_by_safelist(
973                &pq_layer,
974                &query_analysis_layer,
975                r#"fragment F on Query { __typename foo: __schema { __typename } } query Q { __type(name: "foo") { name } ...F ...F }"#,
976                log_unknown,
977                false,
978                // Note that introspection queries don't actually interact with the PQ machinery enough
979                // to update this metric, for better or for worse.
980                2,
981            )
982            .await;
983
984            // But adding any top-level non-introspection field is enough to make it not count as introspection.
985            denied_by_safelist(
986                &pq_layer,
987                &query_analysis_layer,
988                r#"fragment F on Query { __typename foo: __schema { __typename } me { id } } query Q { __type(name: "foo") { name } ...F }"#,
989                log_unknown,
990                3,
991            )
992            .await;
993        }
994        .with_metrics()
995        .await;
996    }
997
998    #[tokio::test(flavor = "multi_thread")]
999    async fn pq_layer_freeform_graphql_with_safelist_log_unknown_false() {
1000        pq_layer_freeform_graphql_with_safelist(false).await;
1001    }
1002
1003    #[tokio::test(flavor = "multi_thread")]
1004    async fn pq_layer_freeform_graphql_with_safelist_log_unknown_true() {
1005        async {
1006            pq_layer_freeform_graphql_with_safelist(true).await;
1007        }
1008        .with_subscriber(assert_snapshot_subscriber!())
1009        .await
1010    }
1011
1012    #[tokio::test(flavor = "multi_thread")]
1013    async fn pq_layer_rejects_invalid_ids_with_safelisting_enabled() {
1014        let (_id, _body, manifest) = fake_manifest();
1015
1016        let (_mock_guard, uplink_config) = mock_pq_uplink(&manifest).await;
1017
1018        let safelist_config = PersistedQueriesSafelist::builder().enabled(true).build();
1019        let pq_layer = PersistedQueryLayer::new(
1020            &Configuration::fake_builder()
1021                .persisted_query(
1022                    PersistedQueries::builder()
1023                        .enabled(true)
1024                        .safelist(safelist_config)
1025                        .build(),
1026                )
1027                .uplink(uplink_config)
1028                .apq(Apq::fake_builder().enabled(false).build())
1029                .build()
1030                .unwrap(),
1031        )
1032        .await
1033        .unwrap();
1034        let invalid_id = "this-id-is-invalid";
1035        let incoming_request = SupergraphRequest::fake_builder()
1036            .extension(
1037                "persistedQuery",
1038                json!({"version": 1, "sha256Hash": invalid_id}),
1039            )
1040            .build()
1041            .unwrap();
1042
1043        assert!(incoming_request.supergraph_request.body().query.is_none());
1044
1045        let result = pq_layer.supergraph_request(incoming_request);
1046        let response = result
1047            .expect_err("pq layer returned request instead of returning an error response")
1048            .next_response()
1049            .await
1050            .expect("could not get response from pq layer");
1051        assert_eq!(
1052            response.errors,
1053            vec![graphql_err_operation_not_found(invalid_id)]
1054        );
1055    }
1056
1057    #[tokio::test(flavor = "multi_thread")]
1058    async fn apq_and_pq_safelisting_is_invalid_config() {
1059        let (_mock_guard, uplink_config) = mock_empty_pq_uplink().await;
1060        let safelist_config = PersistedQueriesSafelist::builder().enabled(true).build();
1061        assert!(
1062            Configuration::fake_builder()
1063                .persisted_query(
1064                    PersistedQueries::builder()
1065                        .enabled(true)
1066                        .safelist(safelist_config)
1067                        .build(),
1068                )
1069                .apq(Apq::fake_builder().enabled(true).build())
1070                .uplink(uplink_config)
1071                .build()
1072                .is_err()
1073        );
1074    }
1075
1076    #[tokio::test(flavor = "multi_thread")]
1077    async fn require_id_disabled_by_default_with_safelisting_enabled_in_pq_layer() {
1078        let safelist_config = PersistedQueriesSafelist::builder().enabled(true).build();
1079        let (_mock_guard, uplink_config) = mock_empty_pq_uplink().await;
1080        let pq_layer = PersistedQueryLayer::new(
1081            &Configuration::fake_builder()
1082                .persisted_query(
1083                    PersistedQueries::builder()
1084                        .enabled(true)
1085                        .safelist(safelist_config)
1086                        .build(),
1087                )
1088                .apq(Apq::fake_builder().enabled(false).build())
1089                .uplink(uplink_config)
1090                .build()
1091                .unwrap(),
1092        )
1093        .await
1094        .unwrap();
1095        assert!(matches!(
1096            pq_layer
1097                .manifest_poller
1098                .unwrap()
1099                .state
1100                .read()
1101                .unwrap()
1102                .freeform_graphql_behavior,
1103            FreeformGraphQLBehavior::AllowIfInSafelist { .. }
1104        ))
1105    }
1106
1107    #[tokio::test(flavor = "multi_thread")]
1108    async fn safelisting_require_id_can_be_enabled_in_pq_layer() {
1109        let safelist_config = PersistedQueriesSafelist::builder()
1110            .enabled(true)
1111            .require_id(true)
1112            .build();
1113        let (_mock_guard, uplink_config) = mock_empty_pq_uplink().await;
1114        let pq_layer = PersistedQueryLayer::new(
1115            &Configuration::fake_builder()
1116                .persisted_query(
1117                    PersistedQueries::builder()
1118                        .enabled(true)
1119                        .safelist(safelist_config)
1120                        .build(),
1121                )
1122                .apq(Apq::fake_builder().enabled(false).build())
1123                .uplink(uplink_config)
1124                .build()
1125                .unwrap(),
1126        )
1127        .await
1128        .unwrap();
1129        assert!(
1130            pq_layer
1131                .manifest_poller
1132                .unwrap()
1133                .never_allows_freeform_graphql()
1134                .is_some()
1135        )
1136    }
1137
1138    #[tokio::test(flavor = "multi_thread")]
1139    async fn safelisting_require_id_rejects_freeform_graphql_in_pq_layer() {
1140        let safelist_config = PersistedQueriesSafelist::builder()
1141            .enabled(true)
1142            .require_id(true)
1143            .build();
1144        let (_mock_guard, uplink_config) = mock_empty_pq_uplink().await;
1145        let pq_layer = PersistedQueryLayer::new(
1146            &Configuration::fake_builder()
1147                .persisted_query(
1148                    PersistedQueries::builder()
1149                        .enabled(true)
1150                        .safelist(safelist_config)
1151                        .build(),
1152                )
1153                .apq(Apq::fake_builder().enabled(false).build())
1154                .uplink(uplink_config)
1155                .build()
1156                .unwrap(),
1157        )
1158        .await
1159        .unwrap();
1160
1161        let incoming_request = SupergraphRequest::fake_builder()
1162            .query("query { typename }")
1163            .build()
1164            .unwrap();
1165
1166        assert!(incoming_request.supergraph_request.body().query.is_some());
1167
1168        let mut supergraph_response = pq_layer
1169            .supergraph_request(incoming_request)
1170            .expect_err("pq layer returned request instead of returning an error response");
1171        assert_eq!(supergraph_response.response.status(), 400);
1172        let response = supergraph_response
1173            .next_response()
1174            .await
1175            .expect("could not get response from pq layer");
1176        assert_eq!(response.errors, vec![graphql_err_pq_id_required()]);
1177
1178        // Try again skipping enforcement.
1179        let context = Context::new();
1180        context
1181            .insert(
1182                PERSISTED_QUERIES_SAFELIST_SKIP_ENFORCEMENT_CONTEXT_KEY,
1183                true,
1184            )
1185            .unwrap();
1186        let incoming_request = SupergraphRequest::fake_builder()
1187            .query("query { typename }")
1188            .context(context)
1189            .build()
1190            .unwrap();
1191        assert!(incoming_request.supergraph_request.body().query.is_some());
1192        assert!(pq_layer.supergraph_request(incoming_request).is_ok());
1193    }
1194
1195    #[tokio::test(flavor = "multi_thread")]
1196    async fn safelisting_disabled_by_default_in_pq_layer() {
1197        let (_mock_guard, uplink_config) = mock_empty_pq_uplink().await;
1198        let pq_layer = PersistedQueryLayer::new(
1199            &Configuration::fake_builder()
1200                .persisted_query(PersistedQueries::builder().enabled(true).build())
1201                .apq(Apq::fake_builder().enabled(false).build())
1202                .uplink(uplink_config)
1203                .build()
1204                .unwrap(),
1205        )
1206        .await
1207        .unwrap();
1208        assert!(matches!(
1209            pq_layer
1210                .manifest_poller
1211                .unwrap()
1212                .state
1213                .read()
1214                .unwrap()
1215                .freeform_graphql_behavior,
1216            FreeformGraphQLBehavior::AllowAll { apq_enabled: false }
1217        ))
1218    }
1219
1220    #[tokio::test(flavor = "multi_thread")]
1221    async fn disabled_safelist_configuration_tracked_in_pq_layer() {
1222        let (_mock_guard, uplink_config) = mock_empty_pq_uplink().await;
1223        let safelist_config = PersistedQueriesSafelist::builder().enabled(false).build();
1224        let pq_layer = PersistedQueryLayer::new(
1225            &Configuration::fake_builder()
1226                .persisted_query(
1227                    PersistedQueries::builder()
1228                        .enabled(true)
1229                        .safelist(safelist_config)
1230                        .build(),
1231                )
1232                .uplink(uplink_config)
1233                .build()
1234                .unwrap(),
1235        )
1236        .await
1237        .unwrap();
1238        assert!(matches!(
1239            pq_layer
1240                .manifest_poller
1241                .unwrap()
1242                .state
1243                .read()
1244                .unwrap()
1245                .freeform_graphql_behavior,
1246            FreeformGraphQLBehavior::AllowAll { apq_enabled: true }
1247        ))
1248    }
1249
1250    #[tokio::test(flavor = "multi_thread")]
1251    async fn can_pass_different_body_from_published_pq_id_with_apq_enabled() {
1252        let (id, _body, manifest) = fake_manifest();
1253        let (_mock_guard, uplink_config) = mock_pq_uplink(&manifest).await;
1254        let pq_layer = PersistedQueryLayer::new(
1255            &Configuration::fake_builder()
1256                .persisted_query(PersistedQueries::builder().enabled(true).build())
1257                .apq(Apq::fake_builder().enabled(true).build())
1258                .uplink(uplink_config)
1259                .build()
1260                .unwrap(),
1261        )
1262        .await
1263        .unwrap();
1264        let incoming_request = SupergraphRequest::fake_builder()
1265            .extension("persistedQuery", json!({"version": 1, "sha256Hash": id}))
1266            .query("invalid body")
1267            .build()
1268            .unwrap();
1269
1270        assert!(incoming_request.supergraph_request.body().query.is_some());
1271
1272        let result = pq_layer.supergraph_request(incoming_request);
1273        assert!(result.is_ok())
1274    }
1275
1276    #[tokio::test(flavor = "multi_thread")]
1277    async fn cannot_pass_different_body_as_published_pq_id_with_apq_disabled() {
1278        let (id, _body, manifest) = fake_manifest();
1279        let (_mock_guard, uplink_config) = mock_pq_uplink(&manifest).await;
1280        let pq_layer = PersistedQueryLayer::new(
1281            &Configuration::fake_builder()
1282                .persisted_query(PersistedQueries::builder().enabled(true).build())
1283                .apq(Apq::fake_builder().enabled(false).build())
1284                .uplink(uplink_config)
1285                .build()
1286                .unwrap(),
1287        )
1288        .await
1289        .unwrap();
1290        let incoming_request = SupergraphRequest::fake_builder()
1291            .extension("persistedQuery", json!({"version": 1, "sha256Hash": id}))
1292            .query("invalid body")
1293            .build()
1294            .unwrap();
1295
1296        assert!(incoming_request.supergraph_request.body().query.is_some());
1297
1298        let mut supergraph_response = pq_layer
1299            .supergraph_request(incoming_request)
1300            .expect_err("pq layer returned request instead of returning an error response");
1301        assert_eq!(supergraph_response.response.status(), 400);
1302        let response = supergraph_response
1303            .next_response()
1304            .await
1305            .expect("could not get response from pq layer");
1306        assert_eq!(response.errors, vec![graphql_err_cannot_send_id_and_body()]);
1307    }
1308
1309    #[tokio::test(flavor = "multi_thread")]
1310    async fn cannot_pass_same_body_as_published_pq_id_with_apq_disabled() {
1311        let (id, body, manifest) = fake_manifest();
1312        let (_mock_guard, uplink_config) = mock_pq_uplink(&manifest).await;
1313        let pq_layer = PersistedQueryLayer::new(
1314            &Configuration::fake_builder()
1315                .persisted_query(PersistedQueries::builder().enabled(true).build())
1316                .apq(Apq::fake_builder().enabled(false).build())
1317                .uplink(uplink_config)
1318                .build()
1319                .unwrap(),
1320        )
1321        .await
1322        .unwrap();
1323        let incoming_request = SupergraphRequest::fake_builder()
1324            .extension("persistedQuery", json!({"version": 1, "sha256Hash": id}))
1325            .query(body)
1326            .build()
1327            .unwrap();
1328
1329        assert!(incoming_request.supergraph_request.body().query.is_some());
1330
1331        let mut supergraph_response = pq_layer
1332            .supergraph_request(incoming_request)
1333            .expect_err("pq layer returned request instead of returning an error response");
1334        assert_eq!(supergraph_response.response.status(), 400);
1335        let response = supergraph_response
1336            .next_response()
1337            .await
1338            .expect("could not get response from pq layer");
1339        assert_eq!(response.errors, vec![graphql_err_cannot_send_id_and_body()]);
1340    }
1341}