apollo-router 2.13.1

A configurable, high-performance routing runtime for Apollo Federation 🚀
Documentation
//! Tower fetcher for fetch node execution.

use std::sync::Arc;
use std::task::Poll;

use futures::future::BoxFuture;
use serde_json_bytes::Value;
use tower::BoxError;
use tower::ServiceExt;
use tracing::Instrument;
use tracing::instrument::Instrumented;

use super::ConnectRequest;
use super::FetchRequest;
use super::SubgraphRequest;
use super::connector_service::ConnectorServiceFactory;
use super::fetch::AddSubgraphNameExt;
use super::fetch::BoxService;
use super::new_service::ServiceFactory;
use crate::configuration::HoistOrphanErrors;
use crate::configuration::subgraph::SubgraphConfiguration;
use crate::graphql::Request as GraphQLRequest;
use crate::http_ext;
use crate::plugins::subscription::SubscriptionConfig;
use crate::plugins::subscription::fetch_service_handle_subscription;
use crate::query_planner::FETCH_SPAN_NAME;
use crate::query_planner::build_operation_with_aliasing;
use crate::query_planner::fetch::FetchNode;
use crate::query_planner::fetch::SubgraphSchemas;
use crate::services::FetchResponse;
use crate::services::SubgraphServiceFactory;
use crate::services::fetch::ErrorMapping;
use crate::services::fetch::Request;
use crate::spec::Schema;

/// The fetch service delegates to either the subgraph service or connector service depending
/// on whether connectors are present in the subgraph.
#[derive(Clone)]
pub(crate) struct FetchService {
    pub(crate) subgraph_service_factory: Arc<SubgraphServiceFactory>,
    pub(crate) schema: Arc<Schema>,
    pub(crate) subgraph_schemas: Arc<SubgraphSchemas>,
    pub(crate) _subscription_config: Option<SubscriptionConfig>, // TODO: add subscription support to FetchService
    pub(crate) connector_service_factory: Arc<ConnectorServiceFactory>,
    pub(crate) hoist_orphan_errors: Arc<SubgraphConfiguration<HoistOrphanErrors>>,
}

impl tower::Service<Request> for FetchService {
    type Response = FetchResponse;
    type Error = BoxError;
    type Future = Instrumented<BoxFuture<'static, Result<Self::Response, Self::Error>>>;

    fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, request: Request) -> Self::Future {
        match request {
            Request::Fetch(request) => self.handle_fetch(request),
            Request::Subscription(request) => fetch_service_handle_subscription(
                self.schema.clone(),
                self.subgraph_service_factory.clone(),
                request,
            ),
        }
    }
}

impl FetchService {
    fn handle_fetch(
        &mut self,
        request: FetchRequest,
    ) -> <FetchService as tower::Service<Request>>::Future {
        let FetchRequest {
            ref context,
            fetch_node: FetchNode {
                ref service_name, ..
            },
            ..
        } = request;
        let service_name = service_name.clone();
        let fetch_time_offset = context.created_at.elapsed().as_nanos() as i64;
        let hoist_orphan_errors = self.hoist_orphan_errors.get(&service_name).enabled;

        if let Some(connector) = self
            .connector_service_factory
            .connectors_by_service_name
            .get(service_name.as_ref())
        {
            Self::fetch_with_connector_service(
                self.schema.clone(),
                self.connector_service_factory.clone(),
                connector.id.subgraph_name.clone(),
                request,
                hoist_orphan_errors,
            )
            .instrument(tracing::info_span!(
                FETCH_SPAN_NAME,
                "otel.kind" = "INTERNAL",
                "apollo.subgraph.name" = connector.id.subgraph_name,
                "apollo_private.sent_time_offset" = fetch_time_offset
            ))
        } else {
            Self::fetch_with_subgraph_service(
                self.schema.clone(),
                self.subgraph_service_factory.clone(),
                self.subgraph_schemas.clone(),
                request,
                hoist_orphan_errors,
            )
            .instrument(tracing::info_span!(
                FETCH_SPAN_NAME,
                "otel.kind" = "INTERNAL",
                "apollo.subgraph.name" = service_name.as_ref(),
                "apollo_private.sent_time_offset" = fetch_time_offset
            ))
        }
    }

    fn fetch_with_connector_service(
        schema: Arc<Schema>,
        connector_service_factory: Arc<ConnectorServiceFactory>,
        subgraph_name: String,
        request: FetchRequest,
        hoist_orphan_errors: bool,
    ) -> BoxFuture<'static, Result<FetchResponse, BoxError>> {
        let FetchRequest {
            fetch_node,
            supergraph_request,
            variables,
            context,
            current_dir,
            ..
        } = request;

        let paths = variables.inverted_paths.clone();
        let operation = fetch_node.operation.as_parsed().cloned();

        Box::pin(async move {
            let connector = schema
                .connectors
                .as_ref()
                .and_then(|c| c.by_service_name.get(&fetch_node.service_name))
                .ok_or("no connector found for service")?;

            let keys = connector.resolvable_key(schema.supergraph_schema())?;

            let (_parts, response) = match connector_service_factory
                .create()
                .oneshot(
                    ConnectRequest::builder()
                        .service_name(fetch_node.service_name.clone())
                        .context(context)
                        .operation(operation?.clone())
                        .supergraph_request(supergraph_request)
                        .variables(variables)
                        .and_keys(keys)
                        .build(),
                )
                .await
                .map_to_graphql_error(subgraph_name.clone(), &current_dir.to_owned())
                .with_subgraph_name(subgraph_name.as_str())
            {
                Err(e) => {
                    return Ok((Value::default(), vec![e]));
                }
                Ok(res) => res.response.into_parts(),
            };

            let (value, errors) = fetch_node.response_at_path(
                &schema,
                &current_dir,
                paths,
                response,
                hoist_orphan_errors,
            );
            Ok((value, errors))
        })
    }

    fn fetch_with_subgraph_service(
        schema: Arc<Schema>,
        subgraph_service_factory: Arc<SubgraphServiceFactory>,
        subgraph_schemas: Arc<SubgraphSchemas>,
        request: FetchRequest,
        hoist_orphan_errors: bool,
    ) -> BoxFuture<'static, Result<FetchResponse, BoxError>> {
        let FetchRequest {
            fetch_node,
            supergraph_request,
            variables,
            current_dir,
            context,
        } = request;

        let FetchNode {
            ref service_name,
            ref operation,
            ref operation_kind,
            ref operation_name,
            ..
        } = fetch_node;

        let uri = schema
            .subgraph_url(service_name.as_ref())
            .unwrap_or_else(|| {
                panic!("schema uri for subgraph '{service_name}' should already have been checked")
            })
            .clone();

        let alias_query_string; // this exists outside the if block to allow the as_str() to be longer lived
        let aliased_operation = if let Some(ctx_arg) = &variables.contextual_arguments {
            if let Some(subgraph_schema) = subgraph_schemas.get(&service_name.to_string()) {
                match build_operation_with_aliasing(operation, ctx_arg, &subgraph_schema.schema) {
                    Ok(op) => {
                        alias_query_string = op.serialize().no_indent().to_string();
                        alias_query_string.as_str()
                    }
                    Err(errors) => {
                        tracing::debug!(
                            "couldn't generate a valid executable document? {:?}",
                            errors
                        );
                        operation.as_serialized()
                    }
                }
            } else {
                tracing::debug!(
                    "couldn't find a subgraph schema for service {:?}",
                    &service_name
                );
                operation.as_serialized()
            }
        } else {
            operation.as_serialized()
        };

        let aqs = aliased_operation.to_string(); // TODO
        let current_dir = current_dir.clone();
        let service = subgraph_service_factory
            .create(&service_name.clone())
            .expect("we already checked that the service exists during planning; qed");

        let mut subgraph_request = SubgraphRequest::builder()
            .supergraph_request(supergraph_request.clone())
            .subgraph_request(
                http_ext::Request::builder()
                    .method(http::Method::POST)
                    .uri(uri)
                    .body(
                        GraphQLRequest::builder()
                            .query(aliased_operation)
                            .and_operation_name(operation_name.as_ref().map(|n| n.to_string()))
                            .variables(variables.variables.clone())
                            .build(),
                    )
                    .build()
                    .expect("it won't fail because the url is correct and already checked; qed"),
            )
            .subgraph_name(service_name.to_string())
            .operation_kind(*operation_kind)
            .and_executable_document(operation.as_parsed().ok().cloned())
            .context(context.clone())
            .build();
        subgraph_request.query_hash = fetch_node.schema_aware_hash.clone();
        subgraph_request.authorization = fetch_node.authorization.clone();
        Box::pin(async move {
            Ok(fetch_node
                .subgraph_fetch(
                    service,
                    subgraph_request,
                    &current_dir,
                    &schema,
                    variables.inverted_paths,
                    &aqs,
                    variables.variables,
                    hoist_orphan_errors,
                )
                .await)
        })
    }
}

#[derive(Clone)]
pub(crate) struct FetchServiceFactory {
    pub(crate) schema: Arc<Schema>,
    pub(crate) subgraph_schemas: Arc<SubgraphSchemas>,
    pub(crate) subgraph_service_factory: Arc<SubgraphServiceFactory>,
    pub(crate) subscription_config: Option<SubscriptionConfig>,
    pub(crate) connector_service_factory: Arc<ConnectorServiceFactory>,
    pub(crate) hoist_orphan_errors: Arc<SubgraphConfiguration<HoistOrphanErrors>>,
}

impl FetchServiceFactory {
    pub(crate) fn new(
        schema: Arc<Schema>,
        subgraph_schemas: Arc<SubgraphSchemas>,
        subgraph_service_factory: Arc<SubgraphServiceFactory>,
        subscription_config: Option<SubscriptionConfig>,
        connector_service_factory: Arc<ConnectorServiceFactory>,
        hoist_orphan_errors: Arc<SubgraphConfiguration<HoistOrphanErrors>>,
    ) -> Self {
        Self {
            subgraph_service_factory,
            subgraph_schemas,
            schema,
            subscription_config,
            connector_service_factory,
            hoist_orphan_errors,
        }
    }
}

impl ServiceFactory<Request> for FetchServiceFactory {
    type Service = BoxService;

    fn create(&self) -> Self::Service {
        FetchService {
            subgraph_service_factory: self.subgraph_service_factory.clone(),
            schema: self.schema.clone(),
            subgraph_schemas: self.subgraph_schemas.clone(),
            _subscription_config: self.subscription_config.clone(),
            connector_service_factory: self.connector_service_factory.clone(),
            hoist_orphan_errors: self.hoist_orphan_errors.clone(),
        }
        .boxed()
    }
}