apollo-router 2.14.0-rc.2

A configurable, high-performance routing runtime for Apollo Federation 🚀
Documentation
//! Contains the core of the fetch service implementation for fetching subscription query plan
//! nodes.

use std::sync::Arc;
use std::sync::atomic::Ordering;

use futures::future::BoxFuture;
use serde_json_bytes::Value;
use tokio::sync::mpsc;
use tower::BoxError;
use tower::ServiceExt;
use tracing::Instrument;
use tracing::instrument::Instrumented;

use crate::error::Error;
use crate::http_ext;
use crate::plugins::subscription::SUBSCRIPTION_SUBGRAPH_NAME_CONTEXT_KEY;
use crate::plugins::subscription::SubscriptionTaskParams;
use crate::query_planner::OperationKind;
use crate::query_planner::SUBSCRIBE_SPAN_NAME;
use crate::query_planner::subscription::OPENED_SUBSCRIPTIONS;
use crate::query_planner::subscription::SubscriptionNode;
use crate::services::FetchResponse;
use crate::services::SubgraphRequest;
use crate::services::SubgraphServiceFactory;
use crate::services::fetch::ErrorMapping;
use crate::services::fetch::SubscriptionRequest;
use crate::services::subgraph::BoxGqlStream;
use crate::spec::Schema;

/// Execute the fetches required to fulfill a subscription query plan node.
///
/// Calls into the relevant subgraph service to run the actual requests. This means the
/// [SubgraphServiceFactory] must produce services that have the [SubscriptionSubgraphLayer][]
/// applied!
///
/// [SubscriptionSubgraphLayer]: super::subgraph::SubscriptionSubgraphLayer
pub(crate) fn fetch_service_handle_subscription(
    schema: Arc<Schema>,
    subgraph_service_factory: Arc<SubgraphServiceFactory>,
    request: SubscriptionRequest,
) -> Instrumented<BoxFuture<'static, Result<FetchResponse, BoxError>>> {
    let SubscriptionRequest {
        ref context,
        subscription_node: SubscriptionNode {
            ref service_name, ..
        },
        ..
    } = request;

    let service_name = service_name.clone();
    let fetch_time_offset = context.created_at.elapsed().as_nanos() as i64;

    // Store the subgraph name in context so it's available for metrics at the router layer
    let _ = context.insert(
        SUBSCRIPTION_SUBGRAPH_NAME_CONTEXT_KEY,
        service_name.to_string(),
    );

    // Subscriptions are not supported for connectors, so they always go to the subgraph service
    subscription_with_subgraph_service(schema, subgraph_service_factory, request).instrument(
        tracing::info_span!(
            SUBSCRIBE_SPAN_NAME,
            "otel.kind" = "INTERNAL",
            "apollo.subgraph.name" = service_name.as_ref(),
            "apollo_private.sent_time_offset" = fetch_time_offset
        ),
    )
}

fn subscription_with_subgraph_service(
    schema: Arc<Schema>,
    subgraph_service_factory: Arc<SubgraphServiceFactory>,
    request: SubscriptionRequest,
) -> BoxFuture<'static, Result<crate::services::fetch::Response, BoxError>> {
    let SubscriptionRequest {
        context,
        subscription_node,
        current_dir,
        sender,
        variables,
        supergraph_request,
        subscription_handle,
        subscription_config,
        ..
    } = request;
    let SubscriptionNode {
        ref service_name,
        ref operation,
        ref operation_name,
        ..
    } = subscription_node;

    let service_name = service_name.clone();

    if let Some(max_opened_subscriptions) = subscription_config
        .as_ref()
        .and_then(|s| s.max_opened_subscriptions)
        && OPENED_SUBSCRIPTIONS.load(Ordering::Relaxed) >= max_opened_subscriptions
    {
        u64_counter!(
            "apollo.router.operations.subscriptions.rejected",
            "Number of subscription requests rejected",
            1,
            reason = "max_opened_subscriptions_limit_reached"
        );
        return Box::pin(async {
            Ok((
                Value::default(),
                vec![
                    Error::builder()
                        .message("can't open new subscription, limit reached")
                        .extension_code("SUBSCRIPTION_MAX_LIMIT")
                        .build(),
                ],
            ))
        });
    }
    let mode = match subscription_config.as_ref() {
        Some(config) => config
            .mode
            .get_subgraph_config(&service_name)
            .map(|mode| (config.clone(), mode)),
        None => {
            return Box::pin(async {
                Ok((
                    Value::default(),
                    vec![
                        Error::builder()
                            .message("subscription support is not enabled")
                            .extension_code("SUBSCRIPTION_DISABLED")
                            .build(),
                    ],
                ))
            });
        }
    };

    let service = subgraph_service_factory
        .create(&service_name)
        .expect("we already checked that the service exists during planning; qed");

    let uri = schema
        .subgraph_url(service_name.as_ref())
        .unwrap_or_else(|| {
            panic!("schema uri for subgraph '{service_name}' should already have been checked")
        })
        .clone();

    let (tx_handle, rx_handle) = mpsc::channel::<BoxGqlStream>(1);

    let subscription_handle = subscription_handle
        .as_ref()
        .expect("checked in PlanNode; qed");

    let subgraph_request = SubgraphRequest::builder()
        .supergraph_request(supergraph_request.clone())
        .subgraph_request(
            http_ext::Request::builder()
                .method(http::Method::POST)
                .uri(uri)
                .body(
                    crate::graphql::Request::builder()
                        .query(operation.as_serialized())
                        .and_operation_name(operation_name.as_ref().map(|n| n.to_string()))
                        .variables(variables.variables.clone())
                        .build(),
                )
                .build()
                .expect("it won't fail because the url is correct and already checked; qed"),
        )
        .operation_kind(OperationKind::Subscription)
        .context(context)
        .subgraph_name(service_name.to_string())
        .subscription_stream(tx_handle)
        .and_connection_closed_signal(Some(subscription_handle.closed_signal.resubscribe()))
        .build();

    let mut subscription_handle = subscription_handle.clone();
    Box::pin(async move {
        let response = match mode {
            Some((subscription_config, _mode)) => {
                let subscription_params = SubscriptionTaskParams {
                    client_sender: sender,
                    subscription_handle: subscription_handle.clone(),
                    subscription_config: subscription_config.clone(),
                    stream_rx: rx_handle.into(),
                };

                let subscription_conf_tx =
                    match subscription_handle.subscription_conf_tx.take() {
                        Some(sc) => sc,
                        None => {
                            return Ok((
                                Value::default(),
                                vec![Error::builder()
                            .message("no subscription conf sender provided for a subscription")
                            .extension_code("NO_SUBSCRIPTION_CONF_TX")
                            .build()],
                            ));
                        }
                    };

                if let Err(err) = subscription_conf_tx.send(subscription_params).await {
                    return Ok((
                        Value::default(),
                        vec![
                            Error::builder()
                                .message(format!("cannot send the subscription data: {err:?}"))
                                .extension_code("SUBSCRIPTION_DATA_SEND_ERROR")
                                .build(),
                        ],
                    ));
                }

                match service
                    .oneshot(subgraph_request)
                    .instrument(tracing::trace_span!("subscription_call"))
                    .await
                    .map_to_graphql_error(service_name.to_string(), &current_dir)
                {
                    Err(e) => {
                        failfast_error!("subgraph call fetch error: {}", e);
                        vec![e]
                    }
                    Ok(response) => response.response.into_parts().1.errors,
                }
            }
            None => {
                vec![
                    Error::builder()
                        .message(format!(
                            "subscription mode is not configured for subgraph {service_name:?}"
                        ))
                        .extension_code("INVALID_SUBSCRIPTION_MODE")
                        .build(),
                ]
            }
        };
        Ok((Value::default(), response))
    })
}

#[cfg(test)]
mod tests {
    use std::collections::HashMap;
    use std::sync::Arc;
    use std::sync::atomic::Ordering;

    use apollo_federation::query_plan::serializable_document::SerializableDocument;
    use serde_json_bytes::Value;
    use tokio::sync::mpsc;

    use super::subscription_with_subgraph_service;
    use crate::Context;
    use crate::json_ext::Path;
    use crate::metrics::FutureMetricsExt;
    use crate::plugins::subscription::SubscriptionConfig;
    use crate::query_planner::OperationKind;
    use crate::query_planner::fetch::Variables;
    use crate::query_planner::subscription::OPENED_SUBSCRIPTIONS;
    use crate::query_planner::subscription::SubscriptionNode;
    use crate::services::SubgraphServiceFactory;
    use crate::services::fetch::SubscriptionRequest;

    #[tokio::test]
    async fn test_subscription_limit_reached_emits_metric() {
        async {
            let original_count = OPENED_SUBSCRIPTIONS.swap(1, Ordering::Relaxed);

            let subscription_config = SubscriptionConfig {
                max_opened_subscriptions: Some(1),
                ..Default::default()
            };

            let subscription_node = SubscriptionNode {
                service_name: Arc::from("subgraph-a"),
                variable_usages: Vec::new(),
                operation: SerializableDocument::from_string("subscription { onEvent { id } }"),
                operation_name: None,
                operation_kind: OperationKind::Subscription,
                input_rewrites: None,
                output_rewrites: None,
            };

            let (sender, _receiver) = mpsc::channel(1);
            let supergraph_request = Arc::new(
                http::Request::builder()
                    .body(crate::graphql::Request::builder().build())
                    .unwrap(),
            );

            let schema = Arc::new(
                crate::spec::Schema::parse(
                    include_str!("../../testdata/minimal_supergraph.graphql"),
                    &Default::default(),
                )
                .expect("could not parse schema"),
            );

            let factory = Arc::new(SubgraphServiceFactory {
                services: Arc::new(HashMap::new()),
            });

            let request = SubscriptionRequest::builder()
                .context(Context::new())
                .subscription_node(subscription_node)
                .supergraph_request(supergraph_request)
                .variables(Variables::default())
                .current_dir(Path(Vec::new()))
                .sender(sender)
                .subscription_config(subscription_config)
                .build();

            let (data, errors) = subscription_with_subgraph_service(schema, factory, request)
                .await
                .expect("call should not fail");

            assert_eq!(data, Value::default());
            assert_eq!(errors.len(), 1);
            assert_eq!(
                errors[0].message,
                "can't open new subscription, limit reached"
            );
            assert_eq!(
                errors[0].extensions.get("code").and_then(|v| v.as_str()),
                Some("SUBSCRIPTION_MAX_LIMIT")
            );
            assert_counter!(
                "apollo.router.operations.subscriptions.rejected",
                1,
                "reason" = "max_opened_subscriptions_limit_reached"
            );

            OPENED_SUBSCRIPTIONS.store(original_count, Ordering::Relaxed);
        }
        .with_metrics()
        .await;
    }
}