#![allow(missing_docs)]
use std::fs;
use std::net::IpAddr;
use std::path::Path;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use derivative::Derivative;
use derive_more::Display;
use derive_more::From;
use displaydoc::Display as DisplayDoc;
use futures::channel::oneshot;
use futures::prelude::*;
use futures::FutureExt;
use http_body::Body as _;
use hyper::Body;
use thiserror::Error;
use tokio::sync::RwLock;
use tokio::task::spawn;
use tower::BoxError;
use tower::ServiceExt;
use tracing_futures::WithSubscriber;
use url::Url;
use self::Event::NoMoreConfiguration;
use self::Event::NoMoreSchema;
use self::Event::Shutdown;
use self::Event::UpdateConfiguration;
use self::Event::UpdateSchema;
use crate::axum_http_server_factory::make_axum_router;
use crate::axum_http_server_factory::AxumHttpServerFactory;
use crate::axum_http_server_factory::ListenAddrAndRouter;
use crate::configuration::Configuration;
use crate::configuration::ListenAddr;
use crate::plugin::DynPlugin;
use crate::router_factory::SupergraphServiceConfigurator;
use crate::router_factory::SupergraphServiceFactory;
use crate::router_factory::YamlSupergraphServiceFactory;
use crate::services::transport;
use crate::spec::Schema;
use crate::state_machine::StateMachine;
type SchemaStream = Pin<Box<dyn Stream<Item = String> + Send>>;
#[allow(unused)]
async fn make_transport_service<RF>(
schema: &str,
configuration: Arc<Configuration>,
extra_plugins: Vec<(String, Box<dyn DynPlugin>)>,
) -> Result<transport::BoxCloneService, BoxError> {
let schema = Arc::new(Schema::parse(schema, &configuration)?);
let service_factory = YamlSupergraphServiceFactory
.create(configuration.clone(), schema, None, Some(extra_plugins))
.await?;
let web_endpoints = service_factory.web_endpoints();
let routers = make_axum_router(service_factory, &configuration, web_endpoints)?;
let ListenAddrAndRouter(_listener, router) = routers.main;
Ok(router
.map_response(|response| {
response.map(|body| {
let mut body = Box::pin(body);
Body::wrap_stream(stream::poll_fn(move |ctx| body.as_mut().poll_data(ctx)))
})
})
.map_err(|error| match error {})
.boxed_clone())
}
#[derive(Error, Debug, DisplayDoc)]
pub enum ApolloRouterError {
StartupError,
HttpServerLifecycleError,
NoConfiguration,
NoSchema,
ServiceCreationError(BoxError),
ServerCreationError(std::io::Error),
DifferentListenAddrsOnSamePort(IpAddr, IpAddr, u16),
SameRouteUsedTwice(IpAddr, u16, String),
}
#[derive(From, Display, Derivative)]
#[derivative(Debug)]
#[non_exhaustive]
pub enum SchemaSource {
#[display(fmt = "String")]
Static { schema_sdl: String },
#[display(fmt = "Stream")]
Stream(#[derivative(Debug = "ignore")] SchemaStream),
#[display(fmt = "File")]
File {
path: PathBuf,
watch: bool,
delay: Option<Duration>,
},
#[display(fmt = "Registry")]
Registry {
apollo_key: String,
apollo_graph_ref: String,
urls: Option<Vec<Url>>,
poll_interval: Duration,
},
}
impl From<&'_ str> for SchemaSource {
fn from(s: &'_ str) -> Self {
Self::Static {
schema_sdl: s.to_owned(),
}
}
}
impl SchemaSource {
fn into_stream(self) -> impl Stream<Item = Event> {
match self {
SchemaSource::Static { schema_sdl: schema } => {
stream::once(future::ready(UpdateSchema(schema))).boxed()
}
SchemaSource::Stream(stream) => stream.map(UpdateSchema).boxed(),
SchemaSource::File { path, watch, delay } => {
if !path.exists() {
tracing::error!(
"Schema file at path '{}' does not exist.",
path.to_string_lossy()
);
stream::empty().boxed()
} else {
match std::fs::read_to_string(&path) {
Ok(schema) => {
if watch {
crate::files::watch(path.to_owned(), delay)
.filter_map(move |_| {
future::ready(std::fs::read_to_string(&path).ok())
})
.map(UpdateSchema)
.boxed()
} else {
stream::once(future::ready(UpdateSchema(schema))).boxed()
}
}
Err(err) => {
tracing::error!("Failed to read schema: {}", err);
stream::empty().boxed()
}
}
}
}
SchemaSource::Registry {
apollo_key,
apollo_graph_ref,
urls,
poll_interval,
} => {
crate::uplink::stream_supergraph(apollo_key, apollo_graph_ref, urls, poll_interval)
.filter_map(|res| {
future::ready(match res {
Ok(schema_result) => Some(UpdateSchema(schema_result.schema)),
Err(e) => {
tracing::error!(
"error downloading the schema from Uplink: {:?}",
e
);
None
}
})
})
.boxed()
}
}
.chain(stream::iter(vec![NoMoreSchema]))
}
}
type ConfigurationStream = Pin<Box<dyn Stream<Item = Configuration> + Send>>;
#[derive(From, Display, Derivative)]
#[derivative(Debug)]
#[non_exhaustive]
pub enum ConfigurationSource {
#[display(fmt = "Static")]
#[from(types(Configuration))]
Static(Box<Configuration>),
#[display(fmt = "Stream")]
Stream(#[derivative(Debug = "ignore")] ConfigurationStream),
#[display(fmt = "File")]
File {
path: PathBuf,
watch: bool,
delay: Option<Duration>,
},
}
impl Default for ConfigurationSource {
fn default() -> Self {
ConfigurationSource::Static(Default::default())
}
}
impl ConfigurationSource {
fn into_stream(self) -> impl Stream<Item = Event> {
match self {
ConfigurationSource::Static(instance) => {
stream::iter(vec![UpdateConfiguration(instance)]).boxed()
}
ConfigurationSource::Stream(stream) => {
stream.map(|x| UpdateConfiguration(Box::new(x))).boxed()
}
ConfigurationSource::File { path, watch, delay } => {
if !path.exists() {
tracing::error!(
"configuration file at path '{}' does not exist.",
path.to_string_lossy()
);
stream::empty().boxed()
} else if watch {
crate::files::watch(path.to_owned(), delay)
.map(move |_| match ConfigurationSource::read_config(&path) {
Ok(config) => UpdateConfiguration(Box::new(config)),
Err(err) => {
tracing::error!("{}", err);
NoMoreConfiguration
}
})
.boxed()
} else {
match ConfigurationSource::read_config(&path) {
Ok(configuration) => stream::once(future::ready(UpdateConfiguration(
Box::new(configuration),
)))
.boxed(),
Err(err) => {
tracing::error!("{}", err);
stream::empty().boxed()
}
}
}
}
}
.chain(stream::iter(vec![NoMoreConfiguration]))
.boxed()
}
fn read_config(path: &Path) -> Result<Configuration, ReadConfigError> {
let config = fs::read_to_string(path)?;
config.parse().map_err(ReadConfigError::Validation)
}
}
#[derive(From, Display)]
enum ReadConfigError {
Io(std::io::Error),
Validation(crate::configuration::ConfigurationError),
}
type ShutdownFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
#[derive(Display, Derivative)]
#[derivative(Debug)]
#[non_exhaustive]
pub enum ShutdownSource {
#[display(fmt = "None")]
None,
#[display(fmt = "Custom")]
Custom(#[derivative(Debug = "ignore")] ShutdownFuture),
#[display(fmt = "CtrlC")]
CtrlC,
}
impl ShutdownSource {
fn into_stream(self) -> impl Stream<Item = Event> {
match self {
ShutdownSource::None => stream::pending::<Event>().boxed(),
ShutdownSource::Custom(future) => future.map(|_| Shutdown).into_stream().boxed(),
ShutdownSource::CtrlC => {
#[cfg(not(unix))]
{
async {
tokio::signal::ctrl_c()
.await
.expect("Failed to install CTRL+C signal handler");
}
.map(|_| Shutdown)
.into_stream()
.boxed()
}
#[cfg(unix)]
future::select(
tokio::signal::ctrl_c().map(|s| s.ok()).boxed(),
async {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("Failed to install SIGTERM signal handler")
.recv()
.await
}
.boxed(),
)
.map(|_| Shutdown)
.into_stream()
.boxed()
}
}
}
}
pub struct RouterHttpServer {
result: Pin<Box<dyn Future<Output = Result<(), ApolloRouterError>> + Send>>,
graphql_listen_address: Arc<RwLock<Option<ListenAddr>>>,
extra_listen_adresses: Arc<RwLock<Vec<ListenAddr>>>,
shutdown_sender: Option<oneshot::Sender<()>>,
}
#[buildstructor::buildstructor]
impl RouterHttpServer {
#[builder(visibility = "pub", entry = "builder", exit = "start")]
fn start(
schema: SchemaSource,
configuration: Option<ConfigurationSource>,
shutdown: Option<ShutdownSource>,
) -> RouterHttpServer {
let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
let event_stream = generate_event_stream(
shutdown.unwrap_or(ShutdownSource::CtrlC),
configuration.unwrap_or_default(),
schema,
shutdown_receiver,
);
let server_factory = AxumHttpServerFactory::new();
let router_factory = YamlSupergraphServiceFactory::default();
let state_machine = StateMachine::new(server_factory, router_factory);
let extra_listen_adresses = state_machine.extra_listen_adresses.clone();
let graphql_listen_address = state_machine.graphql_listen_address.clone();
let result = spawn(
async move { state_machine.process_events(event_stream).await }
.with_current_subscriber(),
)
.map(|r| match r {
Ok(Ok(ok)) => Ok(ok),
Ok(Err(err)) => Err(err),
Err(err) => {
tracing::error!("{}", err);
Err(ApolloRouterError::StartupError)
}
})
.with_current_subscriber()
.boxed();
RouterHttpServer {
result,
shutdown_sender: Some(shutdown_sender),
graphql_listen_address,
extra_listen_adresses,
}
}
pub async fn listen_address(&self) -> Option<ListenAddr> {
self.graphql_listen_address.read().await.clone()
}
pub async fn extra_listen_adresses(&self) -> Vec<ListenAddr> {
self.extra_listen_adresses.read().await.clone()
}
pub async fn shutdown(&mut self) -> Result<(), ApolloRouterError> {
if let Some(sender) = self.shutdown_sender.take() {
let _ = sender.send(());
}
(&mut self.result).await
}
}
#[derive(Debug)]
pub(crate) enum Event {
UpdateConfiguration(Box<Configuration>),
NoMoreConfiguration,
UpdateSchema(String),
NoMoreSchema,
Shutdown,
}
impl Drop for RouterHttpServer {
fn drop(&mut self) {
if let Some(sender) = self.shutdown_sender.take() {
let _ = sender.send(());
}
}
}
impl Future for RouterHttpServer {
type Output = Result<(), ApolloRouterError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.result.poll_unpin(cx)
}
}
fn generate_event_stream(
shutdown: ShutdownSource,
configuration: ConfigurationSource,
schema: SchemaSource,
shutdown_receiver: oneshot::Receiver<()>,
) -> impl Stream<Item = Event> {
let messages = stream::select_all(vec![
shutdown.into_stream().boxed(),
configuration.into_stream().boxed(),
schema.into_stream().boxed(),
shutdown_receiver.into_stream().map(|_| Shutdown).boxed(),
])
.take_while(|msg| future::ready(!matches!(msg, Shutdown)))
.chain(stream::iter(vec![Shutdown]))
.boxed();
messages
}
#[cfg(test)]
mod tests {
use std::env::temp_dir;
use serde_json::json;
use serde_json::to_string_pretty;
use serde_json::Value;
use test_log::test;
use super::*;
use crate::files::tests::create_temp_file;
use crate::files::tests::write_and_flush;
use crate::graphql;
use crate::graphql::Request;
fn init_with_server() -> RouterHttpServer {
let configuration =
serde_yaml::from_str::<Configuration>(include_str!("testdata/supergraph_config.yaml"))
.unwrap();
let schema = include_str!("testdata/supergraph.graphql");
RouterHttpServer::builder()
.configuration(configuration)
.schema(schema)
.start()
}
#[tokio::test(flavor = "multi_thread")]
async fn basic_request() {
let mut router_handle = init_with_server();
let listen_address = router_handle
.listen_address()
.await
.expect("router failed to start");
assert_federated_response(&listen_address, r#"{ topProducts { name } }"#).await;
router_handle.shutdown().await.unwrap();
}
async fn assert_federated_response(listen_addr: &ListenAddr, request: &str) {
let request = Request::builder().query(request).build();
let expected = query(listen_addr, &request).await.unwrap();
let response = to_string_pretty(&expected).unwrap();
assert!(!response.is_empty());
}
async fn query(
listen_addr: &ListenAddr,
request: &graphql::Request,
) -> Result<graphql::Response, crate::error::FetchError> {
Ok(reqwest::Client::new()
.post(format!("{}/", listen_addr))
.json(request)
.send()
.await
.expect("couldn't send request")
.json()
.await
.expect("couldn't deserialize into json"))
}
#[tokio::test(flavor = "multi_thread")]
async fn config_by_file_watching() {
let (path, mut file) = create_temp_file();
let contents = include_str!("testdata/supergraph_config.yaml");
write_and_flush(&mut file, contents).await;
let mut stream = ConfigurationSource::File {
path,
watch: true,
delay: Some(Duration::from_millis(10)),
}
.into_stream()
.boxed();
assert!(matches!(
stream.next().await.unwrap(),
UpdateConfiguration(_)
));
write_and_flush(&mut file, contents).await;
assert!(matches!(
stream.next().await.unwrap(),
UpdateConfiguration(_)
));
write_and_flush(&mut file, ":garbage").await;
let event = stream.into_future().now_or_never();
assert!(event.is_none() || matches!(event, Some((Some(NoMoreConfiguration), _))));
}
#[tokio::test(flavor = "multi_thread")]
async fn config_dev_mode_without_file() {
let mut stream =
ConfigurationSource::from(Configuration::builder().dev(true).build().unwrap())
.into_stream()
.boxed();
let cfg = match stream.next().await.unwrap() {
UpdateConfiguration(configuration) => configuration,
_ => panic!("the event from the stream must be UpdateConfiguration"),
};
assert!(cfg.supergraph.introspection);
assert!(cfg.sandbox.enabled);
assert!(!cfg.homepage.enabled);
assert!(cfg.plugins().iter().any(
|(name, val)| name == "experimental.expose_query_plan" && val == &Value::Bool(true)
));
assert!(cfg
.plugins()
.iter()
.any(|(name, val)| name == "apollo.include_subgraph_errors"
&& val == &json!({"all": true})));
cfg.validate().unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn config_by_file_invalid() {
let (path, mut file) = create_temp_file();
write_and_flush(&mut file, "Garbage").await;
let mut stream = ConfigurationSource::File {
path,
watch: true,
delay: None,
}
.into_stream();
assert!(matches!(stream.next().await.unwrap(), NoMoreConfiguration));
}
#[tokio::test(flavor = "multi_thread")]
async fn config_by_file_missing() {
let mut stream = ConfigurationSource::File {
path: temp_dir().join("does_not_exit"),
watch: true,
delay: None,
}
.into_stream();
assert!(matches!(stream.next().await.unwrap(), NoMoreConfiguration));
}
#[tokio::test(flavor = "multi_thread")]
async fn config_by_file_no_watch() {
let (path, mut file) = create_temp_file();
let contents = include_str!("testdata/supergraph_config.yaml");
write_and_flush(&mut file, contents).await;
let mut stream = ConfigurationSource::File {
path,
watch: false,
delay: None,
}
.into_stream();
assert!(matches!(
stream.next().await.unwrap(),
UpdateConfiguration(_)
));
assert!(matches!(stream.next().await.unwrap(), NoMoreConfiguration));
}
#[test(tokio::test)]
async fn schema_by_file_watching() {
let (path, mut file) = create_temp_file();
let schema = include_str!("testdata/supergraph.graphql");
write_and_flush(&mut file, schema).await;
let mut stream = SchemaSource::File {
path,
watch: true,
delay: Some(Duration::from_millis(10)),
}
.into_stream()
.boxed();
assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
write_and_flush(&mut file, schema).await;
assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
}
#[test(tokio::test)]
async fn schema_by_file_missing() {
let mut stream = SchemaSource::File {
path: temp_dir().join("does_not_exit"),
watch: true,
delay: None,
}
.into_stream();
assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
}
#[test(tokio::test)]
async fn schema_by_file_no_watch() {
let (path, mut file) = create_temp_file();
let schema = include_str!("testdata/supergraph.graphql");
write_and_flush(&mut file, schema).await;
let mut stream = SchemaSource::File {
path,
watch: false,
delay: None,
}
.into_stream();
assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
}
}