hive_router/pipeline/
normalize.rs1use std::hash::{Hash, Hasher};
2use std::sync::Arc;
3
4use hive_router_internal::telemetry::traces::spans::graphql::{
5 GraphQLNormalizeSpan, GraphQLSpanOperationIdentity,
6};
7use hive_router_plan_executor::introspection::partition::partition_operation;
8use hive_router_plan_executor::projection::plan::FieldProjectionPlan;
9use hive_router_query_planner::ast::normalization::normalize_operation;
10use hive_router_query_planner::ast::operation::OperationDefinition;
11use xxhash_rust::xxh3::Xxh3;
12
13use crate::pipeline::error::PipelineError;
14use crate::pipeline::execution_request::ExecutionRequest;
15use crate::pipeline::parser::GraphQLParserPayload;
16use crate::schema_state::{SchemaState, SupergraphData};
17use tracing::{trace, Instrument};
18
19#[derive(Debug, Clone)]
20pub struct GraphQLNormalizationPayload {
21 pub operation_for_plan: Arc<OperationDefinition>,
23 pub operation_for_introspection: Option<Arc<OperationDefinition>>,
24 pub root_type_name: &'static str,
25 pub projection_plan: Arc<Vec<FieldProjectionPlan>>,
26 pub operation_indentity: OperationIdentity,
27}
28
29#[derive(Debug, Clone)]
30pub struct OperationIdentity {
31 pub name: Option<String>,
32 pub operation_type: String,
33 pub client_document_hash: String,
35}
36
37impl<'a> From<&'a OperationIdentity> for GraphQLSpanOperationIdentity<'a> {
38 fn from(op_id: &'a OperationIdentity) -> Self {
39 GraphQLSpanOperationIdentity {
40 name: op_id.name.as_deref(),
41 operation_type: &op_id.operation_type,
42 client_document_hash: &op_id.client_document_hash,
43 }
44 }
45}
46
47#[inline]
48pub async fn normalize_request_with_cache(
49 supergraph: &SupergraphData,
50 schema_state: &SchemaState,
51 execution_params: &ExecutionRequest,
52 parser_payload: &GraphQLParserPayload,
53) -> Result<Arc<GraphQLNormalizationPayload>, PipelineError> {
54 let normalize_span = GraphQLNormalizeSpan::new();
55 async {
56 let cache_key = match &execution_params.operation_name {
57 Some(operation_name) => {
58 let mut hasher = Xxh3::new();
59 execution_params.query.hash(&mut hasher);
60 operation_name.hash(&mut hasher);
61 hasher.finish()
62 }
63 None => parser_payload.cache_key,
64 };
65
66 match schema_state.normalize_cache.get(&cache_key).await {
67 Some(payload) => {
68 trace!(
69 "Found normalized GraphQL operation in cache (operation name={:?}): {}",
70 payload.operation_for_plan.name,
71 payload.operation_for_plan
72 );
73 normalize_span.record_cache_hit(true);
74
75 Ok(payload)
76 }
77 None => {
78 normalize_span.record_cache_hit(false);
79 let doc = normalize_operation(
80 &supergraph.planner.supergraph,
81 &parser_payload.parsed_operation,
82 execution_params.operation_name.as_deref(),
83 )?;
84
85 trace!(
86 "Successfully normalized GraphQL operation (operation name={:?}): {}",
87 doc.operation_name,
88 doc.operation
89 );
90
91 let operation = doc.operation;
92 let (root_type_name, projection_plan) =
93 FieldProjectionPlan::from_operation(&operation, &supergraph.metadata);
94 let partitioned_operation = partition_operation(operation);
95
96 let payload = GraphQLNormalizationPayload {
97 root_type_name,
98 projection_plan: Arc::new(projection_plan),
99 operation_for_plan: Arc::new(partitioned_operation.downstream_operation),
100 operation_for_introspection: partitioned_operation
101 .introspection_operation
102 .map(Arc::new),
103 operation_indentity: OperationIdentity {
104 name: doc.operation_name.clone(),
105 operation_type: parser_payload.operation_type.clone(),
106 client_document_hash: parser_payload.cache_key_string.clone(),
107 },
108 };
109 let payload_arc = Arc::new(payload);
110 schema_state
111 .normalize_cache
112 .insert(cache_key, payload_arc.clone())
113 .await;
114
115 Ok(payload_arc)
116 }
117 }
118 }
119 .instrument(normalize_span.clone())
120 .await
121}