hive-router 0.0.54

GraphQL router/gateway for Federation
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use hive_router_internal::telemetry::traces::spans::graphql::{
    GraphQLNormalizeSpan, GraphQLSpanOperationIdentity,
};
use hive_router_plan_executor::hooks::on_graphql_params::GraphQLParams;
use hive_router_plan_executor::hooks::on_supergraph_load::SupergraphData;
use hive_router_plan_executor::introspection::partition::partition_operation;
use hive_router_plan_executor::projection::plan::FieldProjectionPlan;
use hive_router_query_planner::ast::normalization::error::NormalizationError;
use hive_router_query_planner::ast::normalization::normalize_operation;
use hive_router_query_planner::ast::operation::OperationDefinition;
use hive_router_query_planner::state::supergraph_state::OperationKind;
use xxhash_rust::xxh3::Xxh3;

use crate::cache_state::{CacheHitMiss, EntryResultHitMissExt};
use crate::pipeline::error::PipelineError;
use crate::pipeline::parser::GraphQLParserPayload;
use crate::schema_state::SchemaState;
use tracing::{trace, Instrument};

#[derive(Debug, Clone)]
pub struct GraphQLNormalizationPayload {
    /// The operation to execute, without introspection fields.
    pub operation_for_plan: Arc<OperationDefinition>,
    pub operation_for_plan_hash: u64,
    pub operation_for_introspection: Option<Arc<OperationDefinition>>,
    pub operation_for_introspection_hash: Option<u64>,
    pub normalized_operation_hash: u64,
    pub root_type_name: &'static str,
    pub projection_plan: Arc<Vec<FieldProjectionPlan>>,
    pub operation_indentity: OperationIdentity,
}

#[derive(Debug, Clone)]
pub struct OperationIdentity {
    pub name: Option<String>,
    pub operation_type: OperationKind,
    /// Hash of the original document sent to the router, by the client.
    pub client_document_hash: String,
}

impl<'a> From<&'a OperationIdentity> for GraphQLSpanOperationIdentity<'a> {
    fn from(op_id: &'a OperationIdentity) -> Self {
        GraphQLSpanOperationIdentity {
            name: op_id.name.as_deref(),
            operation_type: op_id.operation_type.as_str(),
            client_document_hash: &op_id.client_document_hash,
        }
    }
}

pub fn hash_normalized_operation(
    operation_for_plan: &OperationDefinition,
    operation_for_introspection: Option<&OperationDefinition>,
) -> NormalizedOperationHashes {
    let operation_for_plan_hash = operation_for_plan.hash();
    let operation_for_introspection_hash =
        operation_for_introspection.map(OperationDefinition::hash);

    let mut hasher = Xxh3::new();
    operation_for_plan_hash.hash(&mut hasher);
    operation_for_introspection_hash.is_some().hash(&mut hasher);
    if let Some(hash) = operation_for_introspection_hash {
        hash.hash(&mut hasher);
    }

    NormalizedOperationHashes {
        operation_for_plan_hash,
        operation_for_introspection_hash,
        combined_operation_hash: hasher.finish(),
    }
}

pub struct NormalizedOperationHashes {
    pub operation_for_plan_hash: u64,
    pub operation_for_introspection_hash: Option<u64>,
    pub combined_operation_hash: u64,
}

#[inline]
pub async fn normalize_request_with_cache(
    supergraph: &SupergraphData,
    schema_state: &SchemaState,
    graphql_params: &GraphQLParams,
    parser_payload: &GraphQLParserPayload,
) -> Result<Arc<GraphQLNormalizationPayload>, PipelineError> {
    let metrics = &schema_state.telemetry_context.metrics;
    let normalize_cache_capture = metrics.cache.normalize.capture_request();
    let normalize_span = GraphQLNormalizeSpan::new();
    async {
        let cache_key = match &graphql_params.operation_name {
            Some(operation_name) => {
                let mut hasher = Xxh3::new();
                graphql_params.query.hash(&mut hasher);
                operation_name.hash(&mut hasher);
                hasher.finish()
            }
            None => parser_payload.cache_key,
        };

        schema_state
            .normalize_cache
            .entry(cache_key)
            .or_try_insert_with::<_, NormalizationError>(async {
                let doc = normalize_operation(
                    &supergraph.planner.supergraph,
                    &parser_payload.parsed_operation,
                    graphql_params.operation_name.as_deref(),
                )?;

                trace!(
                    "Successfully normalized GraphQL operation (operation name={:?}): {}",
                    doc.operation_name,
                    doc.operation
                );

                let operation = doc.operation;
                let (root_type_name, projection_plan) =
                    FieldProjectionPlan::from_operation(&operation, &supergraph.metadata);
                let partitioned_operation = partition_operation(operation);

                let operation_for_plan = Arc::new(partitioned_operation.downstream_operation);
                let operation_for_introspection =
                    partitioned_operation.introspection_operation.map(Arc::new);

                let hashes = hash_normalized_operation(
                    &operation_for_plan,
                    operation_for_introspection.as_deref(),
                );

                let payload = GraphQLNormalizationPayload {
                    root_type_name,
                    projection_plan: Arc::new(projection_plan),
                    operation_for_plan,
                    operation_for_plan_hash: hashes.operation_for_plan_hash,
                    operation_for_introspection,
                    operation_for_introspection_hash: hashes.operation_for_introspection_hash,
                    normalized_operation_hash: hashes.combined_operation_hash,
                    operation_indentity: OperationIdentity {
                        name: doc.operation_name.clone(),
                        operation_type: parser_payload.operation_type.clone(),
                        client_document_hash: parser_payload.cache_key_string.clone(),
                    },
                };

                Ok(Arc::new(payload))
            })
            .await
            .map_err(PipelineError::from)
            .into_result_with_hit_miss(|hit_miss| match hit_miss {
                CacheHitMiss::Hit => {
                    normalize_span.record_cache_hit(true);
                    normalize_cache_capture.finish_hit();
                }
                CacheHitMiss::Miss | CacheHitMiss::Error => {
                    normalize_span.record_cache_hit(false);
                    normalize_cache_capture.finish_miss();
                }
            })
    }
    .instrument(normalize_span.clone())
    .await
}