#![allow(clippy::result_large_err, clippy::const_is_empty)]
use async_graphql::{Name, Value as GqlValue};
use grpc_graphql_gateway::{Gateway, GatewayBuilder, GrpcClient, Result as GatewayResult};
use std::collections::HashMap;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use tokio::net::TcpListener;
use tonic::{transport::Server, Request, Response, Status};
use tracing_subscriber::prelude::*;
type ServiceResult<T> = std::result::Result<T, Status>;
const DESCRIPTOR_SET: &[u8] = include_bytes!("../src/generated/federation_example_descriptor.bin");
const DEFAULT_GRPC_ADDR: &str = "0.0.0.0:50051";
fn describe(list: &[&str]) -> String {
if list.is_empty() {
"none".to_string()
} else {
list.join(", ")
}
}
fn describe_resolvers(list: &[&str]) -> String {
if list.is_empty() {
"none".to_string()
} else {
list.join(", ")
}
}
fn listen_addr(endpoint: &str, fallback: &str) -> GatewayResult<SocketAddr> {
let mut addr = endpoint.trim();
if let Some(stripped) = addr
.strip_prefix("http://")
.or_else(|| addr.strip_prefix("https://"))
{
addr = stripped;
}
if let Some((host, _rest)) = addr.split_once('/') {
addr = host;
}
if let Ok(sock) = addr.parse() {
return Ok(sock);
}
if let Ok(mut iter) = addr.to_socket_addrs() {
if let Some(sock) = iter.next() {
return Ok(sock);
}
}
fallback
.parse()
.map_err(|e| grpc_graphql_gateway::Error::Other(anyhow::Error::new(e)))
}
fn describe_key_sets(keys: &[&[&str]]) -> String {
if keys.is_empty() {
"none".to_string()
} else {
keys.iter()
.map(|set| set.join(" "))
.collect::<Vec<_>>()
.join(" | ")
}
}
fn describe_entities() -> String {
if ENTITY_CONFIGS.is_empty() {
"none".to_string()
} else {
ENTITY_CONFIGS
.iter()
.map(|e| format!("{} (keys: {})", e.type_name, describe_key_sets(e.keys)))
.collect::<Vec<_>>()
.join(", ")
}
}
const QUERIES: &[&str] = &["product", "review", "user", "userReviews"];
const MUTATIONS: &[&str] = &[];
const SUBSCRIPTIONS: &[&str] = &[];
const RESOLVERS: &[&str] = &["_entities"];
#[allow(dead_code)]
const FEDERATION_ENABLED: bool = true;
pub struct EntityConfigInfo {
pub type_name: &'static str,
pub keys: &'static [&'static [&'static str]],
pub extend: bool,
pub resolvable: bool,
}
pub const ENTITY_CONFIGS: &[EntityConfigInfo] = &[
EntityConfigInfo {
type_name: "federation_example_Product",
keys: &[&["upc"]],
extend: false,
resolvable: true,
},
EntityConfigInfo {
type_name: "federation_example_Review",
keys: &[&["id"]],
extend: false,
resolvable: true,
},
EntityConfigInfo {
type_name: "federation_example_User",
keys: &[&["id"]],
extend: false,
resolvable: true,
},
EntityConfigInfo {
type_name: "federation_example_UserExtension",
keys: &[&["id"]],
extend: true,
resolvable: false,
},
];
pub mod federation_example {
include!("../src/generated/federation_example.rs");
}
use federation_example::product_service_server::{ProductService, ProductServiceServer};
use federation_example::review_service_server::{ReviewService, ReviewServiceServer};
use federation_example::user_service_server::{UserService, UserServiceServer};
use federation_example::{Product, Review, User};
pub struct ServiceConfig {
pub name: &'static str,
pub endpoint: &'static str,
pub insecure: bool,
pub queries: &'static [&'static str],
pub mutations: &'static [&'static str],
pub subscriptions: &'static [&'static str],
pub resolvers: &'static [&'static str],
}
pub mod services {
use super::ServiceConfig;
pub const FEDERATION_EXAMPLE_PRODUCTSERVICE: ServiceConfig = ServiceConfig {
name: "federation_example.ProductService",
endpoint: "http://localhost:50052",
insecure: true,
queries: &["product"],
mutations: &[],
subscriptions: &[],
resolvers: &["_entities"],
};
pub const FEDERATION_EXAMPLE_REVIEWSERVICE: ServiceConfig = ServiceConfig {
name: "federation_example.ReviewService",
endpoint: "http://localhost:50053",
insecure: true,
queries: &["review", "userReviews"],
mutations: &[],
subscriptions: &[],
resolvers: &["_entities"],
};
pub const FEDERATION_EXAMPLE_USERSERVICE: ServiceConfig = ServiceConfig {
name: "federation_example.UserService",
endpoint: "http://localhost:50051",
insecure: true,
queries: &["user"],
mutations: &[],
subscriptions: &[],
resolvers: &["_entities"],
};
pub const ALL: &[ServiceConfig] = &[
FEDERATION_EXAMPLE_PRODUCTSERVICE,
FEDERATION_EXAMPLE_REVIEWSERVICE,
FEDERATION_EXAMPLE_USERSERVICE,
];
}
#[derive(Clone, Default)]
pub struct FederationExampleEntityResolver;
#[async_trait::async_trait]
impl grpc_graphql_gateway::EntityResolver for FederationExampleEntityResolver {
async fn resolve_entity(
&self,
_entity_config: &grpc_graphql_gateway::federation::EntityConfig,
representation: &async_graphql::indexmap::IndexMap<Name, GqlValue>,
) -> grpc_graphql_gateway::Result<GqlValue> {
let mut obj = representation.clone();
obj.shift_remove(&Name::new("__typename"));
Ok(GqlValue::Object(obj))
}
async fn batch_resolve_entities(
&self,
entity_config: &grpc_graphql_gateway::federation::EntityConfig,
representations: Vec<async_graphql::indexmap::IndexMap<Name, GqlValue>>,
) -> grpc_graphql_gateway::Result<Vec<GqlValue>> {
let mut results = Vec::with_capacity(representations.len());
for repr in representations {
results.push(self.resolve_entity(entity_config, &repr).await?);
}
Ok(results)
}
}
fn default_entity_resolver() -> Arc<dyn grpc_graphql_gateway::EntityResolver> {
Arc::new(FederationExampleEntityResolver)
}
#[derive(Clone, Default)]
struct ExampleData {
users: HashMap<String, User>,
products: HashMap<String, Product>,
reviews: HashMap<String, Review>,
}
impl ExampleData {
fn seed() -> Self {
let mut users = HashMap::new();
let mut products = HashMap::new();
let mut reviews = HashMap::new();
let alice = User {
id: "u1".to_string(),
email: "alice@example.com".to_string(),
name: "Alice".to_string(),
};
let bob = User {
id: "u2".to_string(),
email: "bob@example.com".to_string(),
name: "Bob".to_string(),
};
users.insert(alice.id.clone(), alice.clone());
users.insert(bob.id.clone(), bob.clone());
let rocket = Product {
upc: "apollo-1".to_string(),
name: "Apollo Rocket".to_string(),
price: 499,
created_by: Some(alice.clone()),
};
let satchel = Product {
upc: "astro-42".to_string(),
name: "Astro Satchel".to_string(),
price: 149,
created_by: Some(bob.clone()),
};
products.insert(rocket.upc.clone(), rocket.clone());
products.insert(satchel.upc.clone(), satchel.clone());
reviews.insert(
"r1".to_string(),
Review {
id: "r1".to_string(),
product: Some(rocket),
author: Some(bob.clone()),
body: "Launches straight and true.".to_string(),
rating: 5,
},
);
reviews.insert(
"r2".to_string(),
Review {
id: "r2".to_string(),
product: Some(satchel),
author: Some(alice),
body: "Fits every mission checklist.".to_string(),
rating: 4,
},
);
Self {
users,
products,
reviews,
}
}
}
#[derive(Clone)]
pub struct ServiceImpl {
data: Arc<ExampleData>,
}
impl Default for ServiceImpl {
fn default() -> Self {
Self {
data: Arc::new(ExampleData::seed()),
}
}
}
#[tonic::async_trait]
impl ProductService for ServiceImpl {
async fn get_product(
&self,
request: Request<federation_example::GetProductRequest>,
) -> ServiceResult<Response<federation_example::GetProductResponse>> {
let upc = request.into_inner().upc;
let product = self.data.products.get(&upc).cloned();
Ok(Response::new(federation_example::GetProductResponse {
product,
}))
}
}
#[tonic::async_trait]
impl ReviewService for ServiceImpl {
async fn get_review(
&self,
request: Request<federation_example::GetReviewRequest>,
) -> ServiceResult<Response<federation_example::GetReviewResponse>> {
let id = request.into_inner().id;
let review = self.data.reviews.get(&id).cloned();
Ok(Response::new(federation_example::GetReviewResponse {
review,
}))
}
async fn get_user_reviews(
&self,
request: Request<federation_example::GetUserReviewsRequest>,
) -> ServiceResult<Response<federation_example::GetUserReviewsResponse>> {
let user_id = request.into_inner().user_id;
let reviews = self
.data
.reviews
.values()
.filter(|review| {
review
.author
.as_ref()
.map(|user| user.id == user_id)
.unwrap_or(false)
})
.cloned()
.collect();
Ok(Response::new(federation_example::GetUserReviewsResponse {
reviews,
}))
}
}
#[tonic::async_trait]
impl UserService for ServiceImpl {
async fn get_user(
&self,
request: Request<federation_example::GetUserRequest>,
) -> ServiceResult<Response<federation_example::GetUserResponse>> {
let id = request.into_inner().id;
let user = self.data.users.get(&id).cloned();
Ok(Response::new(federation_example::GetUserResponse { user }))
}
}
pub async fn run_services() -> GatewayResult<()> {
let mut handles = Vec::new();
{
let addr: SocketAddr = listen_addr("localhost:50052", DEFAULT_GRPC_ADDR)?;
let service = ServiceImpl::default();
tracing::info!(
"gRPC service federation_example.ProductService listening on {}",
addr
);
let handle = tokio::spawn(async move {
Server::builder()
.add_service(ProductServiceServer::new(service.clone()))
.serve(addr)
.await
.map_err(|e| grpc_graphql_gateway::Error::Other(anyhow::Error::new(e)))
});
handles.push(handle);
}
{
let addr: SocketAddr = listen_addr("localhost:50053", DEFAULT_GRPC_ADDR)?;
let service = ServiceImpl::default();
tracing::info!(
"gRPC service federation_example.ReviewService listening on {}",
addr
);
let handle = tokio::spawn(async move {
Server::builder()
.add_service(ReviewServiceServer::new(service.clone()))
.serve(addr)
.await
.map_err(|e| grpc_graphql_gateway::Error::Other(anyhow::Error::new(e)))
});
handles.push(handle);
}
{
let addr: SocketAddr = listen_addr("localhost:50051", DEFAULT_GRPC_ADDR)?;
let service = ServiceImpl::default();
tracing::info!(
"gRPC service federation_example.UserService listening on {}",
addr
);
let handle = tokio::spawn(async move {
Server::builder()
.add_service(UserServiceServer::new(service.clone()))
.serve(addr)
.await
.map_err(|e| grpc_graphql_gateway::Error::Other(anyhow::Error::new(e)))
});
handles.push(handle);
}
for handle in handles {
match handle.await {
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::warn!(error = %e, "gRPC service task exited with error");
}
Err(e) => {
tracing::warn!(error = %e, "gRPC service task panicked or was cancelled");
}
}
}
Ok(())
}
struct ExamplePlugin;
#[async_trait::async_trait]
impl grpc_graphql_gateway::plugin::Plugin for ExamplePlugin {
type Error = Box<dyn std::error::Error + Send + Sync>;
fn name(&self) -> &str {
"ExamplePlugin"
}
async fn on_request(
&self,
_ctx: &grpc_graphql_gateway::middleware::Context,
req: &async_graphql::Request,
) -> Result<(), Self::Error> {
println!("Plugin: Received GraphQL request via on_request hook");
println!(
" - Query: {}",
req.query.chars().take(50).collect::<String>()
);
Ok(())
}
async fn on_response(
&self,
_ctx: &grpc_graphql_gateway::middleware::Context,
res: &async_graphql::Response,
) -> Result<(), Self::Error> {
println!("Plugin: Sending GraphQL response via on_response hook");
if !res.errors.is_empty() {
println!(" - Errors: {:?}", res.errors);
}
Ok(())
}
async fn on_schema_build(
&self,
_builder: &mut grpc_graphql_gateway::schema::SchemaBuilder,
) -> Result<(), Self::Error> {
println!("Plugin: Schema is being built via on_schema_build hook");
Ok(())
}
async fn on_subgraph_request(
&self,
service_name: &str,
metadata: &mut tonic::metadata::MetadataMap,
) -> Result<(), Self::Error> {
println!(
"Plugin: Sending gRPC request to subgraph '{}'",
service_name
);
metadata.insert("x-plugin-header", "example-value".parse().unwrap());
Ok(())
}
}
pub fn gateway_builder() -> GatewayResult<GatewayBuilder> {
let mut builder = Gateway::builder()
.with_descriptor_set_bytes(DESCRIPTOR_SET)
.register_plugin(ExamplePlugin);
if FEDERATION_ENABLED {
tracing::info!(
"Federation enabled (entities: {entities})",
entities = describe_entities()
);
builder = builder
.enable_federation()
.with_entity_resolver(default_entity_resolver());
}
for svc in services::ALL {
tracing::info!(
"{svc} -> {endpoint} (queries: {queries}; mutations: {mutations}; subscriptions: {subscriptions}; resolvers: {resolvers})",
svc = svc.name,
endpoint = svc.endpoint,
queries = describe(svc.queries),
mutations = describe(svc.mutations),
subscriptions = describe(svc.subscriptions),
resolvers = describe_resolvers(svc.resolvers),
);
let client = GrpcClient::builder(svc.endpoint)
.insecure(svc.insecure)
.lazy(true)
.connect_lazy()?;
builder = builder.add_grpc_client(svc.name, client);
}
Ok(builder)
}
pub fn gateway_builder_for_service(svc: &ServiceConfig) -> GatewayResult<GatewayBuilder> {
let mut builder = Gateway::builder().with_descriptor_set_bytes(DESCRIPTOR_SET);
tracing::info!(
"{svc} -> {endpoint} (queries: {queries}; mutations: {mutations}; subscriptions: {subscriptions}; resolvers: {resolvers})",
svc = svc.name,
endpoint = svc.endpoint,
queries = describe(svc.queries),
mutations = describe(svc.mutations),
subscriptions = describe(svc.subscriptions),
resolvers = describe_resolvers(svc.resolvers),
);
if FEDERATION_ENABLED {
builder = builder
.enable_federation()
.with_entity_resolver(default_entity_resolver())
.with_services([svc.name]);
} else {
builder = builder.with_services([svc.name]);
}
let client = GrpcClient::builder(svc.endpoint)
.insecure(svc.insecure)
.lazy(true)
.connect_lazy()?;
builder = builder.add_grpc_client(svc.name, client);
Ok(builder)
}
pub fn gateway_builder_for(name: &str) -> GatewayResult<Option<GatewayBuilder>> {
for svc in services::ALL {
if svc.name == name {
return gateway_builder_for_service(svc).map(Some);
}
}
Ok(None)
}
pub async fn run_federation_example_productservice_gateway() -> GatewayResult<()> {
let gateway =
gateway_builder_for_service(&services::FEDERATION_EXAMPLE_PRODUCTSERVICE)?.build()?;
let app = gateway.into_router();
let listener = TcpListener::bind("0.0.0.0:9000")
.await
.map_err(|e| grpc_graphql_gateway::Error::Other(anyhow::Error::new(e)))?;
axum::serve(listener, app)
.await
.map_err(|e| grpc_graphql_gateway::Error::Other(anyhow::Error::new(e)))
}
pub async fn run_federation_example_reviewservice_gateway() -> GatewayResult<()> {
let gateway =
gateway_builder_for_service(&services::FEDERATION_EXAMPLE_REVIEWSERVICE)?.build()?;
let app = gateway.into_router();
let listener = TcpListener::bind("0.0.0.0:9001")
.await
.map_err(|e| grpc_graphql_gateway::Error::Other(anyhow::Error::new(e)))?;
axum::serve(listener, app)
.await
.map_err(|e| grpc_graphql_gateway::Error::Other(anyhow::Error::new(e)))
}
pub async fn run_federation_example_userservice_gateway() -> GatewayResult<()> {
let gateway =
gateway_builder_for_service(&services::FEDERATION_EXAMPLE_USERSERVICE)?.build()?;
let app = gateway.into_router();
let listener = TcpListener::bind("0.0.0.0:9002")
.await
.map_err(|e| grpc_graphql_gateway::Error::Other(anyhow::Error::new(e)))?;
axum::serve(listener, app)
.await
.map_err(|e| grpc_graphql_gateway::Error::Other(anyhow::Error::new(e)))
}
#[tokio::main]
async fn main() -> GatewayResult<()> {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.init();
tracing::info!(
"GraphQL operations -> queries: {queries}; mutations: {mutations}; subscriptions: {subscriptions}; resolvers: {resolvers}",
queries = describe(QUERIES),
mutations = describe(MUTATIONS),
subscriptions = describe(SUBSCRIPTIONS),
resolvers = describe_resolvers(RESOLVERS),
);
if FEDERATION_ENABLED {
tracing::info!(
"Federation entities -> {entities}",
entities = describe_entities()
);
}
tokio::spawn(async {
if let Err(e) = run_services().await {
tracing::error!(error = %e, "gRPC services exited with error");
}
});
tokio::spawn(async {
if let Err(e) = run_federation_example_productservice_gateway().await {
tracing::error!(error = %e, "GraphQL gateway for federation_example.ProductService exited with error");
}
});
tokio::spawn(async {
if let Err(e) = run_federation_example_reviewservice_gateway().await {
tracing::error!(error = %e, "GraphQL gateway for federation_example.ReviewService exited with error");
}
});
tokio::spawn(async {
if let Err(e) = run_federation_example_userservice_gateway().await {
tracing::error!(error = %e, "GraphQL gateway for federation_example.UserService exited with error");
}
});
tokio::signal::ctrl_c()
.await
.map_err(|e| grpc_graphql_gateway::Error::Other(anyhow::Error::new(e)))?;
Ok(())
}