use std::collections::HashMap;
use std::collections::HashSet;
use std::ops::ControlFlow;
use apollo_compiler::ExecutableDocument;
use apollo_compiler::ast;
use apollo_federation::link::spec::Identity;
use http::StatusCode;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
use serde_json_bytes::Value;
use tower::BoxError;
use tower::ServiceBuilder;
use tower::ServiceExt;
use self::authenticated::AUTHENTICATED_SPEC_VERSION_RANGE;
use self::authenticated::AuthenticatedCheckVisitor;
use self::authenticated::AuthenticatedVisitor;
use self::policy::POLICY_SPEC_VERSION_RANGE;
use self::policy::PolicyExtractionVisitor;
use self::policy::PolicyFilteringVisitor;
use self::scopes::REQUIRES_SCOPES_SPEC_VERSION_RANGE;
use self::scopes::ScopeExtractionVisitor;
use self::scopes::ScopeFilteringVisitor;
use crate::Configuration;
use crate::Context;
use crate::error::QueryPlannerError;
use crate::error::ServiceBuildError;
use crate::graphql;
use crate::json_ext::Path;
use crate::layers::ServiceBuilderExt;
use crate::plugin::Plugin;
use crate::plugin::PluginInit;
use crate::plugins::authentication::APOLLO_AUTHENTICATION_JWT_CLAIMS;
use crate::query_planner::FilteredQuery;
use crate::query_planner::QueryKey;
use crate::services::execution;
use crate::services::layers::query_analysis::ParsedDocumentInner;
use crate::services::supergraph;
use crate::spec::Schema;
use crate::spec::SpecError;
use crate::spec::query::transform;
use crate::spec::query::traverse;
pub(crate) mod authenticated;
pub(crate) mod policy;
pub(crate) mod scopes;
pub(crate) const AUTHENTICATION_REQUIRED_KEY: &str =
"apollo::authorization::authentication_required";
pub(crate) const REQUIRED_SCOPES_KEY: &str = "apollo::authorization::required_scopes";
pub(crate) const REQUIRED_POLICIES_KEY: &str = "apollo::authorization::required_policies";
#[derive(Clone, Debug, Default, Hash, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct CacheKeyMetadata {
pub(crate) is_authenticated: bool,
pub(crate) scopes: Vec<String>,
pub(crate) policies: Vec<String>,
}
#[derive(Clone, Debug, serde_derive_default::Default, Deserialize, JsonSchema)]
#[schemars(rename = "AuthorizationConfig")]
pub(crate) struct Conf {
#[serde(default)]
require_authentication: bool,
#[serde(default)]
directives: Directives,
}
impl Conf {
pub(crate) fn error_config(&self) -> ErrorConfig {
self.directives.errors
}
}
#[derive(Copy, Clone, Debug, serde_derive_default::Default, Deserialize, JsonSchema)]
#[schemars(rename = "AuthorizationDirectivesConfig")]
pub(crate) struct Directives {
#[serde(default = "default_enable_directives")]
enabled: bool,
#[serde(default)]
dry_run: bool,
#[serde(default)]
reject_unauthorized: bool,
#[serde(default)]
errors: ErrorConfig,
}
#[derive(
Copy,
Clone,
Debug,
serde_derive_default::Default,
PartialEq,
Eq,
Serialize,
Deserialize,
JsonSchema,
)]
#[schemars(rename = "AuthorizationErrorConfig")]
pub(crate) struct ErrorConfig {
#[serde(default = "enable_log_errors")]
pub(crate) log: bool,
#[serde(default)]
pub(crate) response: ErrorLocation,
}
fn enable_log_errors() -> bool {
true
}
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub(crate) enum ErrorLocation {
#[default]
Errors,
Extensions,
Disabled,
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct UnauthorizedPaths {
pub(crate) paths: Vec<Path>,
pub(crate) errors: ErrorConfig,
}
impl UnauthorizedPaths {
pub(crate) fn log_unauthorized_paths(&self) {
if self.paths.is_empty() || !self.errors.log {
return;
}
tracing::Span::current().in_scope(|| {
let unauthorized_paths = self
.paths
.iter()
.map(|path| path.to_string())
.collect::<Vec<_>>();
tracing::event!(tracing_core::Level::ERROR, unauthorized_query_paths = ?unauthorized_paths, "Authorization error",);
})
}
pub(crate) fn update_response_with_unauthorized_path_errors(
&self,
response: &mut graphql::Response,
) {
let unauthorized_path_errors = self.paths.iter().map(|path| {
graphql::Error::builder()
.message("Unauthorized field or type")
.path(path.clone())
.extension_code("UNAUTHORIZED_FIELD_OR_TYPE")
.build()
});
match self.errors.response {
ErrorLocation::Errors => {
response.errors.extend(unauthorized_path_errors);
}
ErrorLocation::Extensions => {
let serialized_auth_errors = unauthorized_path_errors
.map(|err| {
serde_json_bytes::to_value(err)
.expect("error serialization should not fail")
})
.collect();
response
.extensions
.insert("authorizationErrors", Value::Array(serialized_auth_errors));
}
ErrorLocation::Disabled => {}
}
}
}
fn default_enable_directives() -> bool {
true
}
pub(crate) struct AuthorizationPlugin {
require_authentication: bool,
}
impl AuthorizationPlugin {
pub(crate) fn enable_directives(
configuration: &Configuration,
schema: &Schema,
) -> Result<bool, ServiceBuildError> {
let has_config = Self::configuration(configuration).directives.enabled;
let has_authorization_directives = schema.has_spec(
&Identity::authenticated_identity(),
AUTHENTICATED_SPEC_VERSION_RANGE,
) || schema.has_spec(
&Identity::requires_scopes_identity(),
REQUIRES_SCOPES_SPEC_VERSION_RANGE,
) || schema
.has_spec(&Identity::policy_identity(), POLICY_SPEC_VERSION_RANGE);
Ok(has_config && has_authorization_directives)
}
pub(crate) fn configuration(configuration: &Configuration) -> Conf {
configuration
.apollo_plugins
.plugins
.get("authorization")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default()
}
pub(crate) fn query_analysis(
doc: &ParsedDocumentInner,
operation_name: Option<&str>,
schema: &Schema,
context: &Context,
) {
let CacheKeyMetadata {
is_authenticated,
scopes,
policies,
} = Self::generate_cache_metadata(
&doc.executable,
operation_name,
schema.supergraph_schema(),
false,
);
if is_authenticated {
context.insert(AUTHENTICATION_REQUIRED_KEY, true).unwrap();
}
if !scopes.is_empty() {
context.insert(REQUIRED_SCOPES_KEY, scopes).unwrap();
}
if !policies.is_empty() {
let policies: HashMap<String, Option<bool>> =
policies.into_iter().map(|policy| (policy, None)).collect();
context.insert(REQUIRED_POLICIES_KEY, policies).unwrap();
}
}
pub(crate) fn generate_cache_metadata(
document: &ExecutableDocument,
operation_name: Option<&str>,
schema: &apollo_compiler::Schema,
entity_query: bool,
) -> CacheKeyMetadata {
let mut is_authenticated = false;
if let Some(mut visitor) = AuthenticatedCheckVisitor::new(schema, document, entity_query) {
if traverse::document(&mut visitor, document, operation_name).is_ok() && visitor.found {
is_authenticated = true;
}
}
let mut scopes = Vec::new();
if let Some(mut visitor) = ScopeExtractionVisitor::new(schema, document, entity_query) {
if traverse::document(&mut visitor, document, operation_name).is_ok() {
scopes = visitor.extracted_scopes.into_iter().collect();
}
}
let mut policies: Vec<String> = Vec::new();
if let Some(mut visitor) = PolicyExtractionVisitor::new(schema, document, entity_query) {
if traverse::document(&mut visitor, document, operation_name).is_ok() {
policies = visitor.extracted_policies.into_iter().collect();
}
}
CacheKeyMetadata {
is_authenticated,
scopes,
policies,
}
}
pub(crate) fn update_cache_key(context: &Context) {
let is_authenticated = context.contains_key(APOLLO_AUTHENTICATION_JWT_CLAIMS);
let request_scopes = context
.get_json_value(APOLLO_AUTHENTICATION_JWT_CLAIMS)
.and_then(|value| {
value.as_object().and_then(|object| {
object.get("scope").and_then(|v| {
v.as_str()
.map(|s| s.split(' ').map(|s| s.to_string()).collect::<HashSet<_>>())
})
})
});
let query_scopes = context.get_json_value(REQUIRED_SCOPES_KEY).and_then(|v| {
v.as_array().map(|v| {
v.iter()
.filter_map(|s| s.as_str().map(|s| s.to_string()))
.collect::<HashSet<_>>()
})
});
let mut scopes = match (request_scopes, query_scopes) {
(None, _) => vec![],
(_, None) => vec![],
(Some(req), Some(query)) => req.intersection(&query).cloned().collect(),
};
scopes.sort();
let mut policies = context
.get_json_value(REQUIRED_POLICIES_KEY)
.and_then(|v| {
v.as_object().map(|v| {
v.iter()
.filter_map(|(policy, result)| match result {
Value::Bool(true) => Some(policy.as_str().to_string()),
_ => None,
})
.collect::<Vec<String>>()
})
})
.unwrap_or_default();
policies.sort();
context.extensions().with_lock(|lock| {
lock.insert(CacheKeyMetadata {
is_authenticated,
scopes,
policies,
})
});
}
pub(crate) fn intersect_cache_keys_subgraph(
left: &CacheKeyMetadata,
right: &CacheKeyMetadata,
) -> CacheKeyMetadata {
CacheKeyMetadata {
is_authenticated: left.is_authenticated && right.is_authenticated,
scopes: left
.scopes
.iter()
.collect::<HashSet<_>>()
.intersection(&right.scopes.iter().collect::<HashSet<_>>())
.map(|s| s.to_string())
.collect(),
policies: left
.policies
.iter()
.collect::<HashSet<_>>()
.intersection(&right.policies.iter().collect::<HashSet<_>>())
.map(|s| s.to_string())
.collect(),
}
}
pub(crate) fn filter_query(
configuration: &Conf,
key: &QueryKey,
schema: &Schema,
) -> Result<Option<FilteredQuery>, QueryPlannerError> {
let reject_unauthorized = configuration.directives.reject_unauthorized;
let dry_run = configuration.directives.dry_run;
let doc = ast::Document::parse(&key.filtered_query, "filtered_query")
.unwrap_or_else(|invalid| invalid.partial);
let is_authenticated = key.metadata.is_authenticated;
let scopes = &key.metadata.scopes;
let policies = &key.metadata.policies;
let mut is_filtered = false;
let mut unauthorized_paths: Vec<Path> = vec![];
let filter_res = Self::authenticated_filter_query(schema, dry_run, &doc, is_authenticated)?;
let doc = match filter_res {
None => doc,
Some((filtered_doc, paths)) => {
unauthorized_paths.extend(paths);
if filtered_doc.definitions.is_empty() {
return Err(QueryPlannerError::Unauthorized(unauthorized_paths));
}
is_filtered = true;
filtered_doc
}
};
let filter_res = Self::scopes_filter_query(schema, dry_run, &doc, scopes)?;
let doc = match filter_res {
None => doc,
Some((filtered_doc, paths)) => {
unauthorized_paths.extend(paths);
if filtered_doc.definitions.is_empty() {
return Err(QueryPlannerError::Unauthorized(unauthorized_paths));
}
is_filtered = true;
filtered_doc
}
};
let filter_res = Self::policies_filter_query(schema, dry_run, &doc, policies)?;
let doc = match filter_res {
None => doc,
Some((filtered_doc, paths)) => {
unauthorized_paths.extend(paths);
if filtered_doc.definitions.is_empty() {
return Err(QueryPlannerError::Unauthorized(unauthorized_paths));
}
is_filtered = true;
filtered_doc
}
};
if reject_unauthorized && !unauthorized_paths.is_empty() {
return Err(QueryPlannerError::Unauthorized(unauthorized_paths));
}
if is_filtered {
Ok(Some((unauthorized_paths, doc)))
} else {
Ok(None)
}
}
fn authenticated_filter_query(
schema: &Schema,
dry_run: bool,
doc: &ast::Document,
is_authenticated: bool,
) -> Result<Option<(ast::Document, Vec<Path>)>, QueryPlannerError> {
if let Some(mut visitor) = AuthenticatedVisitor::new(
schema.supergraph_schema(),
&schema.implementers_map,
dry_run,
) {
let modified_query = transform::document(&mut visitor, doc)
.map_err(|e| SpecError::TransformError(e.to_string()))?;
if visitor.query_requires_authentication {
if is_authenticated {
tracing::debug!(
"the query contains @authenticated, the request is authenticated, keeping the query"
);
Ok(None)
} else {
tracing::debug!(
"the query contains @authenticated, modified query:\n{modified_query}\nunauthorized paths: {:?}",
visitor
.unauthorized_paths
.iter()
.map(|path| path.to_string())
.collect::<Vec<_>>()
);
Ok(Some((modified_query, visitor.unauthorized_paths)))
}
} else {
tracing::debug!("the query does not contain @authenticated");
Ok(None)
}
} else {
tracing::debug!("the schema does not contain @authenticated");
Ok(None)
}
}
fn scopes_filter_query(
schema: &Schema,
dry_run: bool,
doc: &ast::Document,
scopes: &[String],
) -> Result<Option<(ast::Document, Vec<Path>)>, QueryPlannerError> {
if let Some(mut visitor) = ScopeFilteringVisitor::new(
schema.supergraph_schema(),
&schema.implementers_map,
scopes.iter().cloned().collect(),
dry_run,
) {
let modified_query = transform::document(&mut visitor, doc)
.map_err(|e| SpecError::TransformError(e.to_string()))?;
if visitor.query_requires_scopes {
tracing::debug!(
"the query required scopes, the requests present scopes: {scopes:?}, modified query:\n{modified_query}\nunauthorized paths: {:?}",
visitor
.unauthorized_paths
.iter()
.map(|path| path.to_string())
.collect::<Vec<_>>()
);
Ok(Some((modified_query, visitor.unauthorized_paths)))
} else {
tracing::debug!("the query does not require scopes");
Ok(None)
}
} else {
tracing::debug!("the schema does not contain @requiresScopes");
Ok(None)
}
}
fn policies_filter_query(
schema: &Schema,
dry_run: bool,
doc: &ast::Document,
policies: &[String],
) -> Result<Option<(ast::Document, Vec<Path>)>, QueryPlannerError> {
if let Some(mut visitor) = PolicyFilteringVisitor::new(
schema.supergraph_schema(),
&schema.implementers_map,
policies.iter().cloned().collect(),
dry_run,
) {
let modified_query = transform::document(&mut visitor, doc)
.map_err(|e| SpecError::TransformError(e.to_string()))?;
if visitor.query_requires_policies {
tracing::debug!(
"the query required policies, the requests present policies: {policies:?}, modified query:\n{modified_query}\nunauthorized paths: {:?}",
visitor
.unauthorized_paths
.iter()
.map(|path| path.to_string())
.collect::<Vec<_>>()
);
Ok(Some((modified_query, visitor.unauthorized_paths)))
} else {
tracing::debug!("the query does not require policies");
Ok(None)
}
} else {
tracing::debug!("the schema does not contain @policy");
Ok(None)
}
}
}
#[async_trait::async_trait]
impl Plugin for AuthorizationPlugin {
type Config = Conf;
async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError> {
Ok(AuthorizationPlugin {
require_authentication: init.config.require_authentication,
})
}
fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
if self.require_authentication {
ServiceBuilder::new()
.checkpoint(move |request: supergraph::Request| {
if request
.context
.contains_key(APOLLO_AUTHENTICATION_JWT_CLAIMS)
{
Ok(ControlFlow::Continue(request))
} else {
tracing::error!("rejecting unauthenticated request");
let response = supergraph::Response::error_builder()
.error(
graphql::Error::builder()
.message("unauthenticated".to_string())
.extension_code("AUTH_ERROR")
.build(),
)
.status_code(StatusCode::UNAUTHORIZED)
.context(request.context)
.build()?;
Ok(ControlFlow::Break(response))
}
})
.service(service)
.boxed()
} else {
service
}
}
fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
ServiceBuilder::new()
.map_request(|request: execution::Request| {
let filtered = !request.query_plan.query.unauthorized.paths.is_empty();
let needs_authenticated = request.context.contains_key(AUTHENTICATION_REQUIRED_KEY);
let needs_requires_scopes = request.context.contains_key(REQUIRED_SCOPES_KEY);
if needs_authenticated || needs_requires_scopes {
u64_counter!(
"apollo.router.operations.authorization",
"Number of subgraph requests requiring authorization",
1,
authorization.filtered = filtered,
authorization.needs_authenticated = needs_authenticated,
authorization.needs_requires_scopes = needs_requires_scopes
);
}
request
})
.service(service)
.boxed()
}
}
register_plugin!("apollo", "authorization", AuthorizationPlugin);
#[cfg(test)]
mod tests;