Skip to main content

hive_router/pipeline/
normalize.rs

1use std::hash::{Hash, Hasher};
2use std::sync::Arc;
3
4use hive_router_internal::telemetry::traces::spans::graphql::{
5    GraphQLNormalizeSpan, GraphQLSpanOperationIdentity,
6};
7use hive_router_plan_executor::introspection::partition::partition_operation;
8use hive_router_plan_executor::projection::plan::FieldProjectionPlan;
9use hive_router_query_planner::ast::normalization::normalize_operation;
10use hive_router_query_planner::ast::operation::OperationDefinition;
11use xxhash_rust::xxh3::Xxh3;
12
13use crate::pipeline::error::PipelineError;
14use crate::pipeline::execution_request::ExecutionRequest;
15use crate::pipeline::parser::GraphQLParserPayload;
16use crate::schema_state::{SchemaState, SupergraphData};
17use tracing::{trace, Instrument};
18
19#[derive(Debug, Clone)]
20pub struct GraphQLNormalizationPayload {
21    /// The operation to execute, without introspection fields.
22    pub operation_for_plan: Arc<OperationDefinition>,
23    pub operation_for_introspection: Option<Arc<OperationDefinition>>,
24    pub root_type_name: &'static str,
25    pub projection_plan: Arc<Vec<FieldProjectionPlan>>,
26    pub operation_indentity: OperationIdentity,
27}
28
29#[derive(Debug, Clone)]
30pub struct OperationIdentity {
31    pub name: Option<String>,
32    pub operation_type: String,
33    /// Hash of the original document sent to the router, by the client.
34    pub client_document_hash: String,
35}
36
37impl<'a> From<&'a OperationIdentity> for GraphQLSpanOperationIdentity<'a> {
38    fn from(op_id: &'a OperationIdentity) -> Self {
39        GraphQLSpanOperationIdentity {
40            name: op_id.name.as_deref(),
41            operation_type: &op_id.operation_type,
42            client_document_hash: &op_id.client_document_hash,
43        }
44    }
45}
46
47#[inline]
48pub async fn normalize_request_with_cache(
49    supergraph: &SupergraphData,
50    schema_state: &SchemaState,
51    execution_params: &ExecutionRequest,
52    parser_payload: &GraphQLParserPayload,
53) -> Result<Arc<GraphQLNormalizationPayload>, PipelineError> {
54    let normalize_span = GraphQLNormalizeSpan::new();
55    async {
56        let cache_key = match &execution_params.operation_name {
57            Some(operation_name) => {
58                let mut hasher = Xxh3::new();
59                execution_params.query.hash(&mut hasher);
60                operation_name.hash(&mut hasher);
61                hasher.finish()
62            }
63            None => parser_payload.cache_key,
64        };
65
66        match schema_state.normalize_cache.get(&cache_key).await {
67            Some(payload) => {
68                trace!(
69                    "Found normalized GraphQL operation in cache (operation name={:?}): {}",
70                    payload.operation_for_plan.name,
71                    payload.operation_for_plan
72                );
73                normalize_span.record_cache_hit(true);
74
75                Ok(payload)
76            }
77            None => {
78                normalize_span.record_cache_hit(false);
79                let doc = normalize_operation(
80                    &supergraph.planner.supergraph,
81                    &parser_payload.parsed_operation,
82                    execution_params.operation_name.as_deref(),
83                )?;
84
85                trace!(
86                    "Successfully normalized GraphQL operation (operation name={:?}): {}",
87                    doc.operation_name,
88                    doc.operation
89                );
90
91                let operation = doc.operation;
92                let (root_type_name, projection_plan) =
93                    FieldProjectionPlan::from_operation(&operation, &supergraph.metadata);
94                let partitioned_operation = partition_operation(operation);
95
96                let payload = GraphQLNormalizationPayload {
97                    root_type_name,
98                    projection_plan: Arc::new(projection_plan),
99                    operation_for_plan: Arc::new(partitioned_operation.downstream_operation),
100                    operation_for_introspection: partitioned_operation
101                        .introspection_operation
102                        .map(Arc::new),
103                    operation_indentity: OperationIdentity {
104                        name: doc.operation_name.clone(),
105                        operation_type: parser_payload.operation_type.clone(),
106                        client_document_hash: parser_payload.cache_key_string.clone(),
107                    },
108                };
109                let payload_arc = Arc::new(payload);
110                schema_state
111                    .normalize_cache
112                    .insert(cache_key, payload_arc.clone())
113                    .await;
114
115                Ok(payload_arc)
116            }
117        }
118    }
119    .instrument(normalize_span.clone())
120    .await
121}