use std::sync::Arc;
use std::sync::atomic::Ordering;
use futures::future::BoxFuture;
use serde_json_bytes::Value;
use tokio::sync::mpsc;
use tower::BoxError;
use tower::ServiceExt;
use tracing::Instrument;
use tracing::instrument::Instrumented;
use crate::error::Error;
use crate::http_ext;
use crate::plugins::subscription::SubscriptionTaskParams;
use crate::query_planner::OperationKind;
use crate::query_planner::SUBSCRIBE_SPAN_NAME;
use crate::query_planner::subscription::OPENED_SUBSCRIPTIONS;
use crate::query_planner::subscription::SubscriptionNode;
use crate::services::FetchResponse;
use crate::services::SubgraphRequest;
use crate::services::SubgraphServiceFactory;
use crate::services::fetch::ErrorMapping;
use crate::services::fetch::SubscriptionRequest;
use crate::services::subgraph::BoxGqlStream;
use crate::spec::Schema;
pub(crate) fn fetch_service_handle_subscription(
schema: Arc<Schema>,
subgraph_service_factory: Arc<SubgraphServiceFactory>,
request: SubscriptionRequest,
) -> Instrumented<BoxFuture<'static, Result<FetchResponse, BoxError>>> {
let SubscriptionRequest {
ref context,
subscription_node: SubscriptionNode {
ref service_name, ..
},
..
} = request;
let service_name = service_name.clone();
let fetch_time_offset = context.created_at.elapsed().as_nanos() as i64;
subscription_with_subgraph_service(schema, subgraph_service_factory, request).instrument(
tracing::info_span!(
SUBSCRIBE_SPAN_NAME,
"otel.kind" = "INTERNAL",
"apollo.subgraph.name" = service_name.as_ref(),
"apollo_private.sent_time_offset" = fetch_time_offset
),
)
}
fn subscription_with_subgraph_service(
schema: Arc<Schema>,
subgraph_service_factory: Arc<SubgraphServiceFactory>,
request: SubscriptionRequest,
) -> BoxFuture<'static, Result<crate::services::fetch::Response, BoxError>> {
let SubscriptionRequest {
context,
subscription_node,
current_dir,
sender,
variables,
supergraph_request,
subscription_handle,
subscription_config,
..
} = request;
let SubscriptionNode {
ref service_name,
ref operation,
ref operation_name,
..
} = subscription_node;
let service_name = service_name.clone();
if let Some(max_opened_subscriptions) = subscription_config
.as_ref()
.and_then(|s| s.max_opened_subscriptions)
&& OPENED_SUBSCRIPTIONS.load(Ordering::Relaxed) >= max_opened_subscriptions
{
return Box::pin(async {
Ok((
Value::default(),
vec![
Error::builder()
.message("can't open new subscription, limit reached")
.extension_code("SUBSCRIPTION_MAX_LIMIT")
.build(),
],
))
});
}
let mode = match subscription_config.as_ref() {
Some(config) => config
.mode
.get_subgraph_config(&service_name)
.map(|mode| (config.clone(), mode)),
None => {
return Box::pin(async {
Ok((
Value::default(),
vec![
Error::builder()
.message("subscription support is not enabled")
.extension_code("SUBSCRIPTION_DISABLED")
.build(),
],
))
});
}
};
let service = subgraph_service_factory
.create(&service_name)
.expect("we already checked that the service exists during planning; qed");
let uri = schema
.subgraph_url(service_name.as_ref())
.unwrap_or_else(|| {
panic!("schema uri for subgraph '{service_name}' should already have been checked")
})
.clone();
let (tx_handle, rx_handle) = mpsc::channel::<BoxGqlStream>(1);
let subscription_handle = subscription_handle
.as_ref()
.expect("checked in PlanNode; qed");
let subgraph_request = SubgraphRequest::builder()
.supergraph_request(supergraph_request.clone())
.subgraph_request(
http_ext::Request::builder()
.method(http::Method::POST)
.uri(uri)
.body(
crate::graphql::Request::builder()
.query(operation.as_serialized())
.and_operation_name(operation_name.as_ref().map(|n| n.to_string()))
.variables(variables.variables.clone())
.build(),
)
.build()
.expect("it won't fail because the url is correct and already checked; qed"),
)
.operation_kind(OperationKind::Subscription)
.context(context)
.subgraph_name(service_name.to_string())
.subscription_stream(tx_handle)
.and_connection_closed_signal(Some(subscription_handle.closed_signal.resubscribe()))
.build();
let mut subscription_handle = subscription_handle.clone();
Box::pin(async move {
let response = match mode {
Some((subscription_config, _mode)) => {
let subscription_params = SubscriptionTaskParams {
client_sender: sender,
subscription_handle: subscription_handle.clone(),
subscription_config: subscription_config.clone(),
stream_rx: rx_handle.into(),
};
let subscription_conf_tx =
match subscription_handle.subscription_conf_tx.take() {
Some(sc) => sc,
None => {
return Ok((
Value::default(),
vec![Error::builder()
.message("no subscription conf sender provided for a subscription")
.extension_code("NO_SUBSCRIPTION_CONF_TX")
.build()],
));
}
};
if let Err(err) = subscription_conf_tx.send(subscription_params).await {
return Ok((
Value::default(),
vec![
Error::builder()
.message(format!("cannot send the subscription data: {err:?}"))
.extension_code("SUBSCRIPTION_DATA_SEND_ERROR")
.build(),
],
));
}
match service
.oneshot(subgraph_request)
.instrument(tracing::trace_span!("subscription_call"))
.await
.map_to_graphql_error(service_name.to_string(), ¤t_dir)
{
Err(e) => {
failfast_error!("subgraph call fetch error: {}", e);
vec![e]
}
Ok(response) => response.response.into_parts().1.errors,
}
}
None => {
vec![
Error::builder()
.message(format!(
"subscription mode is not configured for subgraph {service_name:?}"
))
.extension_code("INVALID_SUBSCRIPTION_MODE")
.build(),
]
}
};
Ok((Value::default(), response))
})
}