use std::sync::Arc;
use std::sync::atomic::Ordering;
use futures::future::BoxFuture;
use serde_json_bytes::Value;
use tokio::sync::mpsc;
use tower::BoxError;
use tower::ServiceExt;
use tracing::Instrument;
use tracing::instrument::Instrumented;
use crate::error::Error;
use crate::http_ext;
use crate::plugins::subscription::SUBSCRIPTION_SUBGRAPH_NAME_CONTEXT_KEY;
use crate::plugins::subscription::SubscriptionTaskParams;
use crate::query_planner::OperationKind;
use crate::query_planner::SUBSCRIBE_SPAN_NAME;
use crate::query_planner::subscription::OPENED_SUBSCRIPTIONS;
use crate::query_planner::subscription::SubscriptionNode;
use crate::services::FetchResponse;
use crate::services::SubgraphRequest;
use crate::services::SubgraphServiceFactory;
use crate::services::fetch::ErrorMapping;
use crate::services::fetch::SubscriptionRequest;
use crate::services::subgraph::BoxGqlStream;
use crate::spec::Schema;
pub(crate) fn fetch_service_handle_subscription(
schema: Arc<Schema>,
subgraph_service_factory: Arc<SubgraphServiceFactory>,
request: SubscriptionRequest,
) -> Instrumented<BoxFuture<'static, Result<FetchResponse, BoxError>>> {
let SubscriptionRequest {
ref context,
subscription_node: SubscriptionNode {
ref service_name, ..
},
..
} = request;
let service_name = service_name.clone();
let fetch_time_offset = context.created_at.elapsed().as_nanos() as i64;
let _ = context.insert(
SUBSCRIPTION_SUBGRAPH_NAME_CONTEXT_KEY,
service_name.to_string(),
);
subscription_with_subgraph_service(schema, subgraph_service_factory, request).instrument(
tracing::info_span!(
SUBSCRIBE_SPAN_NAME,
"otel.kind" = "INTERNAL",
"apollo.subgraph.name" = service_name.as_ref(),
"apollo_private.sent_time_offset" = fetch_time_offset
),
)
}
fn subscription_with_subgraph_service(
schema: Arc<Schema>,
subgraph_service_factory: Arc<SubgraphServiceFactory>,
request: SubscriptionRequest,
) -> BoxFuture<'static, Result<crate::services::fetch::Response, BoxError>> {
let SubscriptionRequest {
context,
subscription_node,
current_dir,
sender,
variables,
supergraph_request,
subscription_handle,
subscription_config,
..
} = request;
let SubscriptionNode {
ref service_name,
ref operation,
ref operation_name,
..
} = subscription_node;
let service_name = service_name.clone();
if let Some(max_opened_subscriptions) = subscription_config
.as_ref()
.and_then(|s| s.max_opened_subscriptions)
&& OPENED_SUBSCRIPTIONS.load(Ordering::Relaxed) >= max_opened_subscriptions
{
u64_counter!(
"apollo.router.operations.subscriptions.rejected",
"Number of subscription requests rejected",
1,
reason = "max_opened_subscriptions_limit_reached"
);
return Box::pin(async {
Ok((
Value::default(),
vec![
Error::builder()
.message("can't open new subscription, limit reached")
.extension_code("SUBSCRIPTION_MAX_LIMIT")
.build(),
],
))
});
}
let mode = match subscription_config.as_ref() {
Some(config) => config
.mode
.get_subgraph_config(&service_name)
.map(|mode| (config.clone(), mode)),
None => {
return Box::pin(async {
Ok((
Value::default(),
vec![
Error::builder()
.message("subscription support is not enabled")
.extension_code("SUBSCRIPTION_DISABLED")
.build(),
],
))
});
}
};
let service = subgraph_service_factory
.create(&service_name)
.expect("we already checked that the service exists during planning; qed");
let uri = schema
.subgraph_url(service_name.as_ref())
.unwrap_or_else(|| {
panic!("schema uri for subgraph '{service_name}' should already have been checked")
})
.clone();
let (tx_handle, rx_handle) = mpsc::channel::<BoxGqlStream>(1);
let subscription_handle = subscription_handle
.as_ref()
.expect("checked in PlanNode; qed");
let subgraph_request = SubgraphRequest::builder()
.supergraph_request(supergraph_request.clone())
.subgraph_request(
http_ext::Request::builder()
.method(http::Method::POST)
.uri(uri)
.body(
crate::graphql::Request::builder()
.query(operation.as_serialized())
.and_operation_name(operation_name.as_ref().map(|n| n.to_string()))
.variables(variables.variables.clone())
.build(),
)
.build()
.expect("it won't fail because the url is correct and already checked; qed"),
)
.operation_kind(OperationKind::Subscription)
.context(context)
.subgraph_name(service_name.to_string())
.subscription_stream(tx_handle)
.and_connection_closed_signal(Some(subscription_handle.closed_signal.resubscribe()))
.build();
let mut subscription_handle = subscription_handle.clone();
Box::pin(async move {
let response = match mode {
Some((subscription_config, _mode)) => {
let subscription_params = SubscriptionTaskParams {
client_sender: sender,
subscription_handle: subscription_handle.clone(),
subscription_config: subscription_config.clone(),
stream_rx: rx_handle.into(),
};
let subscription_conf_tx =
match subscription_handle.subscription_conf_tx.take() {
Some(sc) => sc,
None => {
return Ok((
Value::default(),
vec![Error::builder()
.message("no subscription conf sender provided for a subscription")
.extension_code("NO_SUBSCRIPTION_CONF_TX")
.build()],
));
}
};
if let Err(err) = subscription_conf_tx.send(subscription_params).await {
return Ok((
Value::default(),
vec![
Error::builder()
.message(format!("cannot send the subscription data: {err:?}"))
.extension_code("SUBSCRIPTION_DATA_SEND_ERROR")
.build(),
],
));
}
match service
.oneshot(subgraph_request)
.instrument(tracing::trace_span!("subscription_call"))
.await
.map_to_graphql_error(service_name.to_string(), ¤t_dir)
{
Err(e) => {
failfast_error!("subgraph call fetch error: {}", e);
vec![e]
}
Ok(response) => response.response.into_parts().1.errors,
}
}
None => {
vec![
Error::builder()
.message(format!(
"subscription mode is not configured for subgraph {service_name:?}"
))
.extension_code("INVALID_SUBSCRIPTION_MODE")
.build(),
]
}
};
Ok((Value::default(), response))
})
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use apollo_federation::query_plan::serializable_document::SerializableDocument;
use serde_json_bytes::Value;
use tokio::sync::mpsc;
use super::subscription_with_subgraph_service;
use crate::Context;
use crate::json_ext::Path;
use crate::metrics::FutureMetricsExt;
use crate::plugins::subscription::SubscriptionConfig;
use crate::query_planner::OperationKind;
use crate::query_planner::fetch::Variables;
use crate::query_planner::subscription::OPENED_SUBSCRIPTIONS;
use crate::query_planner::subscription::SubscriptionNode;
use crate::services::SubgraphServiceFactory;
use crate::services::fetch::SubscriptionRequest;
#[tokio::test]
async fn test_subscription_limit_reached_emits_metric() {
async {
let original_count = OPENED_SUBSCRIPTIONS.swap(1, Ordering::Relaxed);
let subscription_config = SubscriptionConfig {
max_opened_subscriptions: Some(1),
..Default::default()
};
let subscription_node = SubscriptionNode {
service_name: Arc::from("subgraph-a"),
variable_usages: Vec::new(),
operation: SerializableDocument::from_string("subscription { onEvent { id } }"),
operation_name: None,
operation_kind: OperationKind::Subscription,
input_rewrites: None,
output_rewrites: None,
};
let (sender, _receiver) = mpsc::channel(1);
let supergraph_request = Arc::new(
http::Request::builder()
.body(crate::graphql::Request::builder().build())
.unwrap(),
);
let schema = Arc::new(
crate::spec::Schema::parse(
include_str!("../../testdata/minimal_supergraph.graphql"),
&Default::default(),
)
.expect("could not parse schema"),
);
let factory = Arc::new(SubgraphServiceFactory {
services: Arc::new(HashMap::new()),
});
let request = SubscriptionRequest::builder()
.context(Context::new())
.subscription_node(subscription_node)
.supergraph_request(supergraph_request)
.variables(Variables::default())
.current_dir(Path(Vec::new()))
.sender(sender)
.subscription_config(subscription_config)
.build();
let (data, errors) = subscription_with_subgraph_service(schema, factory, request)
.await
.expect("call should not fail");
assert_eq!(data, Value::default());
assert_eq!(errors.len(), 1);
assert_eq!(
errors[0].message,
"can't open new subscription, limit reached"
);
assert_eq!(
errors[0].extensions.get("code").and_then(|v| v.as_str()),
Some("SUBSCRIPTION_MAX_LIMIT")
);
assert_counter!(
"apollo.router.operations.subscriptions.rejected",
1,
"reason" = "max_opened_subscriptions_limit_reached"
);
OPENED_SUBSCRIPTIONS.store(original_count, Ordering::Relaxed);
}
.with_metrics()
.await;
}
}