hive_router/pipeline/
normalize.rs

1use std::hash::{Hash, Hasher};
2use std::sync::Arc;
3
4use hive_router_plan_executor::introspection::partition::partition_operation;
5use hive_router_plan_executor::projection::plan::FieldProjectionPlan;
6use hive_router_query_planner::ast::normalization::normalize_operation;
7use hive_router_query_planner::ast::operation::OperationDefinition;
8use ntex::web::HttpRequest;
9use xxhash_rust::xxh3::Xxh3;
10
11use crate::pipeline::error::{PipelineError, PipelineErrorFromAcceptHeader, PipelineErrorVariant};
12use crate::pipeline::execution_request::ExecutionRequest;
13use crate::pipeline::parser::GraphQLParserPayload;
14use crate::schema_state::{SchemaState, SupergraphData};
15use tracing::{error, trace};
16
17#[derive(Debug, Clone)]
18pub struct GraphQLNormalizationPayload {
19    /// The operation to execute, without introspection fields.
20    pub operation_for_plan: Arc<OperationDefinition>,
21    pub operation_for_introspection: Option<Arc<OperationDefinition>>,
22    pub root_type_name: &'static str,
23    pub projection_plan: Arc<Vec<FieldProjectionPlan>>,
24}
25
26#[inline]
27pub async fn normalize_request_with_cache(
28    req: &HttpRequest,
29    supergraph: &SupergraphData,
30    schema_state: &Arc<SchemaState>,
31    execution_params: &ExecutionRequest,
32    parser_payload: &GraphQLParserPayload,
33) -> Result<Arc<GraphQLNormalizationPayload>, PipelineError> {
34    let cache_key = match &execution_params.operation_name {
35        Some(operation_name) => {
36            let mut hasher = Xxh3::new();
37            execution_params.query.hash(&mut hasher);
38            operation_name.hash(&mut hasher);
39            hasher.finish()
40        }
41        None => parser_payload.cache_key,
42    };
43
44    match schema_state.normalize_cache.get(&cache_key).await {
45        Some(payload) => {
46            trace!(
47                "Found normalized GraphQL operation in cache (operation name={:?}): {}",
48                payload.operation_for_plan.name,
49                payload.operation_for_plan
50            );
51
52            Ok(payload)
53        }
54        None => match normalize_operation(
55            &supergraph.planner.supergraph,
56            &parser_payload.parsed_operation,
57            execution_params.operation_name.as_deref(),
58        ) {
59            Ok(doc) => {
60                trace!(
61                    "Successfully normalized GraphQL operation (operation name={:?}): {}",
62                    doc.operation_name,
63                    doc.operation
64                );
65
66                let operation = doc.operation;
67                let (root_type_name, projection_plan) =
68                    FieldProjectionPlan::from_operation(&operation, &supergraph.metadata);
69                let partitioned_operation = partition_operation(operation);
70
71                let payload = GraphQLNormalizationPayload {
72                    root_type_name,
73                    projection_plan: Arc::new(projection_plan),
74                    operation_for_plan: Arc::new(partitioned_operation.downstream_operation),
75                    operation_for_introspection: partitioned_operation
76                        .introspection_operation
77                        .map(Arc::new),
78                };
79                let payload_arc = Arc::new(payload);
80                schema_state
81                    .normalize_cache
82                    .insert(cache_key, payload_arc.clone())
83                    .await;
84
85                Ok(payload_arc)
86            }
87            Err(err) => {
88                error!("Failed to normalize GraphQL operation: {}", err);
89                trace!("{:?}", err);
90
91                Err(req.new_pipeline_error(PipelineErrorVariant::NormalizationError(err)))
92            }
93        },
94    }
95}