use std::sync::Arc;
use std::task::Poll;
use futures::future::BoxFuture;
use serde_json_bytes::Value;
use tower::BoxError;
use tower::ServiceExt;
use tracing::Instrument;
use tracing::instrument::Instrumented;
use super::ConnectRequest;
use super::FetchRequest;
use super::SubgraphRequest;
use super::connector_service::ConnectorServiceFactory;
use super::fetch::AddSubgraphNameExt;
use super::fetch::BoxService;
use super::new_service::ServiceFactory;
use crate::configuration::HoistOrphanErrors;
use crate::configuration::subgraph::SubgraphConfiguration;
use crate::graphql::Request as GraphQLRequest;
use crate::http_ext;
use crate::plugins::subscription::SubscriptionConfig;
use crate::plugins::subscription::fetch_service_handle_subscription;
use crate::query_planner::FETCH_SPAN_NAME;
use crate::query_planner::build_operation_with_aliasing;
use crate::query_planner::fetch::FetchNode;
use crate::query_planner::fetch::SubgraphSchemas;
use crate::services::FetchResponse;
use crate::services::SubgraphServiceFactory;
use crate::services::fetch::ErrorMapping;
use crate::services::fetch::Request;
use crate::spec::Schema;
#[derive(Clone)]
pub(crate) struct FetchService {
pub(crate) subgraph_service_factory: Arc<SubgraphServiceFactory>,
pub(crate) schema: Arc<Schema>,
pub(crate) subgraph_schemas: Arc<SubgraphSchemas>,
pub(crate) _subscription_config: Option<SubscriptionConfig>, pub(crate) connector_service_factory: Arc<ConnectorServiceFactory>,
pub(crate) hoist_orphan_errors: Arc<SubgraphConfiguration<HoistOrphanErrors>>,
}
impl tower::Service<Request> for FetchService {
type Response = FetchResponse;
type Error = BoxError;
type Future = Instrumented<BoxFuture<'static, Result<Self::Response, Self::Error>>>;
fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, request: Request) -> Self::Future {
match request {
Request::Fetch(request) => self.handle_fetch(request),
Request::Subscription(request) => fetch_service_handle_subscription(
self.schema.clone(),
self.subgraph_service_factory.clone(),
request,
),
}
}
}
impl FetchService {
fn handle_fetch(
&mut self,
request: FetchRequest,
) -> <FetchService as tower::Service<Request>>::Future {
let FetchRequest {
ref context,
fetch_node: FetchNode {
ref service_name, ..
},
..
} = request;
let service_name = service_name.clone();
let fetch_time_offset = context.created_at.elapsed().as_nanos() as i64;
let hoist_orphan_errors = self.hoist_orphan_errors.get(&service_name).enabled;
if let Some(connector) = self
.connector_service_factory
.connectors_by_service_name
.get(service_name.as_ref())
{
Self::fetch_with_connector_service(
self.schema.clone(),
self.connector_service_factory.clone(),
connector.id.subgraph_name.clone(),
request,
hoist_orphan_errors,
)
.instrument(tracing::info_span!(
FETCH_SPAN_NAME,
"otel.kind" = "INTERNAL",
"apollo.subgraph.name" = connector.id.subgraph_name,
"apollo_private.sent_time_offset" = fetch_time_offset
))
} else {
Self::fetch_with_subgraph_service(
self.schema.clone(),
self.subgraph_service_factory.clone(),
self.subgraph_schemas.clone(),
request,
hoist_orphan_errors,
)
.instrument(tracing::info_span!(
FETCH_SPAN_NAME,
"otel.kind" = "INTERNAL",
"apollo.subgraph.name" = service_name.as_ref(),
"apollo_private.sent_time_offset" = fetch_time_offset
))
}
}
fn fetch_with_connector_service(
schema: Arc<Schema>,
connector_service_factory: Arc<ConnectorServiceFactory>,
subgraph_name: String,
request: FetchRequest,
hoist_orphan_errors: bool,
) -> BoxFuture<'static, Result<FetchResponse, BoxError>> {
let FetchRequest {
fetch_node,
supergraph_request,
variables,
context,
current_dir,
..
} = request;
let paths = variables.inverted_paths.clone();
let operation = fetch_node.operation.as_parsed().cloned();
Box::pin(async move {
let connector = schema
.connectors
.as_ref()
.and_then(|c| c.by_service_name.get(&fetch_node.service_name))
.ok_or("no connector found for service")?;
let keys = connector.resolvable_key(schema.supergraph_schema())?;
let (_parts, response) = match connector_service_factory
.create()
.oneshot(
ConnectRequest::builder()
.service_name(fetch_node.service_name.clone())
.context(context)
.operation(operation?.clone())
.supergraph_request(supergraph_request)
.variables(variables)
.and_keys(keys)
.build(),
)
.await
.map_to_graphql_error(subgraph_name.clone(), ¤t_dir.to_owned())
.with_subgraph_name(subgraph_name.as_str())
{
Err(e) => {
return Ok((Value::default(), vec![e]));
}
Ok(res) => res.response.into_parts(),
};
let (value, errors) = fetch_node.response_at_path(
&schema,
¤t_dir,
paths,
response,
hoist_orphan_errors,
);
Ok((value, errors))
})
}
fn fetch_with_subgraph_service(
schema: Arc<Schema>,
subgraph_service_factory: Arc<SubgraphServiceFactory>,
subgraph_schemas: Arc<SubgraphSchemas>,
request: FetchRequest,
hoist_orphan_errors: bool,
) -> BoxFuture<'static, Result<FetchResponse, BoxError>> {
let FetchRequest {
fetch_node,
supergraph_request,
variables,
current_dir,
context,
} = request;
let FetchNode {
ref service_name,
ref operation,
ref operation_kind,
ref operation_name,
..
} = fetch_node;
let uri = schema
.subgraph_url(service_name.as_ref())
.unwrap_or_else(|| {
panic!("schema uri for subgraph '{service_name}' should already have been checked")
})
.clone();
let alias_query_string; let aliased_operation = if let Some(ctx_arg) = &variables.contextual_arguments {
if let Some(subgraph_schema) = subgraph_schemas.get(&service_name.to_string()) {
match build_operation_with_aliasing(operation, ctx_arg, &subgraph_schema.schema) {
Ok(op) => {
alias_query_string = op.serialize().no_indent().to_string();
alias_query_string.as_str()
}
Err(errors) => {
tracing::debug!(
"couldn't generate a valid executable document? {:?}",
errors
);
operation.as_serialized()
}
}
} else {
tracing::debug!(
"couldn't find a subgraph schema for service {:?}",
&service_name
);
operation.as_serialized()
}
} else {
operation.as_serialized()
};
let aqs = aliased_operation.to_string(); let current_dir = current_dir.clone();
let service = subgraph_service_factory
.create(&service_name.clone())
.expect("we already checked that the service exists during planning; qed");
let mut subgraph_request = SubgraphRequest::builder()
.supergraph_request(supergraph_request.clone())
.subgraph_request(
http_ext::Request::builder()
.method(http::Method::POST)
.uri(uri)
.body(
GraphQLRequest::builder()
.query(aliased_operation)
.and_operation_name(operation_name.as_ref().map(|n| n.to_string()))
.variables(variables.variables.clone())
.build(),
)
.build()
.expect("it won't fail because the url is correct and already checked; qed"),
)
.subgraph_name(service_name.to_string())
.operation_kind(*operation_kind)
.and_executable_document(operation.as_parsed().ok().cloned())
.context(context.clone())
.build();
subgraph_request.query_hash = fetch_node.schema_aware_hash.clone();
subgraph_request.authorization = fetch_node.authorization.clone();
Box::pin(async move {
Ok(fetch_node
.subgraph_fetch(
service,
subgraph_request,
¤t_dir,
&schema,
variables.inverted_paths,
&aqs,
variables.variables,
hoist_orphan_errors,
)
.await)
})
}
}
#[derive(Clone)]
pub(crate) struct FetchServiceFactory {
pub(crate) schema: Arc<Schema>,
pub(crate) subgraph_schemas: Arc<SubgraphSchemas>,
pub(crate) subgraph_service_factory: Arc<SubgraphServiceFactory>,
pub(crate) subscription_config: Option<SubscriptionConfig>,
pub(crate) connector_service_factory: Arc<ConnectorServiceFactory>,
pub(crate) hoist_orphan_errors: Arc<SubgraphConfiguration<HoistOrphanErrors>>,
}
impl FetchServiceFactory {
pub(crate) fn new(
schema: Arc<Schema>,
subgraph_schemas: Arc<SubgraphSchemas>,
subgraph_service_factory: Arc<SubgraphServiceFactory>,
subscription_config: Option<SubscriptionConfig>,
connector_service_factory: Arc<ConnectorServiceFactory>,
hoist_orphan_errors: Arc<SubgraphConfiguration<HoistOrphanErrors>>,
) -> Self {
Self {
subgraph_service_factory,
subgraph_schemas,
schema,
subscription_config,
connector_service_factory,
hoist_orphan_errors,
}
}
}
impl ServiceFactory<Request> for FetchServiceFactory {
type Service = BoxService;
fn create(&self) -> Self::Service {
FetchService {
subgraph_service_factory: self.subgraph_service_factory.clone(),
schema: self.schema.clone(),
subgraph_schemas: self.subgraph_schemas.clone(),
_subscription_config: self.subscription_config.clone(),
connector_service_factory: self.connector_service_factory.clone(),
hoist_orphan_errors: self.hoist_orphan_errors.clone(),
}
.boxed()
}
}