use std::path::PathBuf;
use std::pin::Pin;
use std::time::Duration;
use derivative::Derivative;
use derive_more::Display;
use derive_more::From;
use futures::prelude::*;
use url::Url;
use crate::registry::OciConfig;
use crate::registry::create_oci_schema_stream;
use crate::router::Event;
use crate::router::Event::NoMoreSchema;
use crate::router::Event::UpdateSchema;
use crate::uplink::UplinkConfig;
use crate::uplink::schema::SchemaState;
use crate::uplink::schema_stream::SupergraphSdlQuery;
use crate::uplink::stream_from_uplink;
type SchemaStream = Pin<Box<dyn Stream<Item = String> + Send>>;
#[derive(From, Display, Derivative)]
#[derivative(Debug)]
#[non_exhaustive]
pub enum SchemaSource {
#[display("String")]
Static { schema_sdl: String },
#[display("Stream")]
Stream(#[derivative(Debug = "ignore")] SchemaStream),
#[display("File")]
File {
path: PathBuf,
watch: bool,
},
#[display("Registry")]
Registry(UplinkConfig),
#[display("URLs")]
URLs {
urls: Vec<Url>,
},
#[display("Registry")]
OCI(OciConfig),
}
impl From<&'_ str> for SchemaSource {
fn from(s: &'_ str) -> Self {
Self::Static {
schema_sdl: s.to_owned(),
}
}
}
impl SchemaSource {
pub(crate) fn into_stream(self) -> impl Stream<Item = Event> {
match self {
SchemaSource::Static { schema_sdl: schema } => {
let update_schema = UpdateSchema(SchemaState {
sdl: schema,
launch_id: None,
});
stream::once(future::ready(update_schema)).boxed()
}
SchemaSource::Stream(stream) => stream
.map(|sdl| {
UpdateSchema(SchemaState {
sdl,
launch_id: None,
})
})
.boxed(),
SchemaSource::File {
path,
watch,
} => {
if !path.exists() {
tracing::error!(
"Supergraph schema at path '{}' does not exist.",
path.to_string_lossy()
);
stream::empty().boxed()
} else {
match std::fs::read_to_string(&path) {
Ok(schema) => {
if watch {
crate::files::watch(&path)
.filter_map(move |_| {
let path = path.clone();
async move {
match tokio::fs::read_to_string(&path).await {
Ok(schema) => {
let update_schema = UpdateSchema(SchemaState {
sdl: schema,
launch_id: None,
});
Some(update_schema)
}
Err(err) => {
tracing::error!(reason = %err, "failed to read supergraph schema");
None
}
}
}
})
.boxed()
} else {
let update_schema = UpdateSchema(SchemaState {
sdl: schema,
launch_id: None,
});
stream::once(future::ready(update_schema)).boxed()
}
}
Err(err) => {
tracing::error!(reason = %err, "failed to read supergraph schema");
stream::empty().boxed()
}
}
}
}
SchemaSource::Registry(uplink_config) => {
stream_from_uplink::<SupergraphSdlQuery, SchemaState>(uplink_config)
.filter_map(|res| {
future::ready(match res {
Ok(schema) => {
let update_schema = UpdateSchema(schema);
Some(update_schema)
}
Err(e) => {
tracing::error!("{}", e);
None
}
})
})
.boxed()
}
SchemaSource::URLs { urls } => {
futures::stream::once(async move {
fetch_supergraph_from_first_viable_url(&urls).await
})
.filter_map(|s| async move { s.map(Event::UpdateSchema) })
.boxed()
}
SchemaSource::OCI(oci_config) => {
tracing::debug!("using oci as schema source");
match create_oci_schema_stream(oci_config) {
Ok(stream) => Pin::new(Box::new(stream))
.filter_map(|res| {
future::ready(match res {
Ok(schema) => {
let update_schema = UpdateSchema(schema);
Some(update_schema)
}
Err(e) => {
tracing::error!("{}", e);
None
}
})
})
.boxed(),
Err(e) => {
tracing::error!("failed to create OCI schema stream: {}", e);
stream::empty().boxed()
}
}
}
}
.chain(stream::iter(vec![NoMoreSchema]))
.boxed()
}
}
async fn fetch_supergraph_from_first_viable_url(urls: &[Url]) -> Option<SchemaState> {
let Ok(client) = reqwest::Client::builder()
.no_gzip()
.timeout(Duration::from_secs(10))
.build()
else {
tracing::error!("failed to create HTTP client to fetch supergraph schema");
return None;
};
for url in urls {
match client
.get(reqwest::Url::parse(url.as_ref()).unwrap())
.send()
.await
{
Ok(res) if res.status().is_success() => match res.text().await {
Ok(schema) => {
return Some(SchemaState {
sdl: schema,
launch_id: None,
});
}
Err(err) => {
tracing::warn!(
url.full = %url,
reason = %err,
"failed to fetch supergraph schema"
)
}
},
Ok(res) => tracing::warn!(
http.response.status_code = res.status().as_u16(),
url.full = %url,
"failed to fetch supergraph schema"
),
Err(err) => tracing::warn!(
url.full = %url,
reason = %err,
"failed to fetch supergraph schema"
),
}
}
tracing::error!("failed to fetch supergraph schema from all urls");
None
}
#[cfg(test)]
mod tests {
use std::env::temp_dir;
use test_log::test;
use tracing_futures::WithSubscriber;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
use super::*;
use crate::assert_snapshot_subscriber;
use crate::files::tests::create_temp_file;
use crate::files::tests::write_and_flush;
#[test(tokio::test)]
async fn schema_by_file_watching() {
let (path, mut file) = create_temp_file();
let schema = include_str!("../../testdata/supergraph.graphql");
write_and_flush(&mut file, schema).await;
let mut stream = SchemaSource::File { path, watch: true }
.into_stream()
.boxed();
assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
let schema_minimal = include_str!("../../testdata/minimal_supergraph.graphql");
write_and_flush(&mut file, schema_minimal).await;
assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
}
#[test(tokio::test)]
async fn schema_by_file_no_watch() {
let (path, mut file) = create_temp_file();
let schema = include_str!("../../testdata/supergraph.graphql");
write_and_flush(&mut file, schema).await;
let mut stream = SchemaSource::File { path, watch: false }.into_stream();
assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
}
#[test(tokio::test)]
async fn schema_by_file_missing() {
let mut stream = SchemaSource::File {
path: temp_dir().join("does_not_exist"),
watch: true,
}
.into_stream();
assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
}
const SCHEMA_1: &str = "schema1";
const SCHEMA_2: &str = "schema2";
#[test(tokio::test)]
async fn schema_by_url() {
async {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/schema1"))
.respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_1))
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/schema2"))
.respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_2))
.mount(&mock_server)
.await;
let mut stream = SchemaSource::URLs {
urls: vec![
Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
],
}
.into_stream();
assert!(
matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1)
);
assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
}
.with_subscriber(assert_snapshot_subscriber!())
.await;
}
#[test(tokio::test)]
async fn schema_by_url_fallback() {
async {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/schema1"))
.respond_with(ResponseTemplate::new(400))
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/schema2"))
.respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_2))
.mount(&mock_server)
.await;
let mut stream = SchemaSource::URLs {
urls: vec![
Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
],
}
.into_stream();
assert!(
matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_2)
);
assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
}
.with_subscriber(assert_snapshot_subscriber!({
"[].fields[\"url.full\"]" => "[url.full]"
}))
.await;
}
#[test(tokio::test)]
async fn schema_by_url_all_fail() {
async {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/schema1"))
.respond_with(ResponseTemplate::new(400))
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/schema2"))
.respond_with(ResponseTemplate::new(400))
.mount(&mock_server)
.await;
let mut stream = SchemaSource::URLs {
urls: vec![
Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
],
}
.into_stream();
assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
}
.with_subscriber(assert_snapshot_subscriber!({
"[].fields[\"url.full\"]" => "[url.full]"
}))
.await;
}
}