apollo_router/router/event/
schema.rs1use std::path::PathBuf;
2use std::pin::Pin;
3use std::time::Duration;
4
5use derivative::Derivative;
6use derive_more::Display;
7use derive_more::From;
8use futures::prelude::*;
9use url::Url;
10
11use crate::registry::OciConfig;
12use crate::registry::fetch_oci;
13use crate::router::Event;
14use crate::router::Event::NoMoreSchema;
15use crate::router::Event::UpdateSchema;
16use crate::uplink::UplinkConfig;
17use crate::uplink::schema::SchemaState;
18use crate::uplink::schema_stream::SupergraphSdlQuery;
19use crate::uplink::stream_from_uplink;
20
21type SchemaStream = Pin<Box<dyn Stream<Item = String> + Send>>;
22
23#[derive(From, Display, Derivative)]
25#[derivative(Debug)]
26#[non_exhaustive]
27pub enum SchemaSource {
28 #[display("String")]
30 Static { schema_sdl: String },
31
32 #[display("Stream")]
34 Stream(#[derivative(Debug = "ignore")] SchemaStream),
35
36 #[display("File")]
38 File {
39 path: PathBuf,
41
42 watch: bool,
44 },
45
46 #[display("Registry")]
48 Registry(UplinkConfig),
49
50 #[display("URLs")]
52 URLs {
53 urls: Vec<Url>,
55 },
56
57 #[display("Registry")]
58 OCI(OciConfig),
59}
60
61impl From<&'_ str> for SchemaSource {
62 fn from(s: &'_ str) -> Self {
63 Self::Static {
64 schema_sdl: s.to_owned(),
65 }
66 }
67}
68
69impl SchemaSource {
70 pub(crate) fn into_stream(self) -> impl Stream<Item = Event> {
72 match self {
73 SchemaSource::Static { schema_sdl: schema } => {
74 let update_schema = UpdateSchema(SchemaState {
75 sdl: schema,
76 launch_id: None,
77 });
78 stream::once(future::ready(update_schema)).boxed()
79 }
80 SchemaSource::Stream(stream) => stream
81 .map(|sdl| {
82 UpdateSchema(SchemaState {
83 sdl,
84 launch_id: None,
85 })
86 })
87 .boxed(),
88 SchemaSource::File {
89 path,
90 watch,
91 } => {
92 if !path.exists() {
94 tracing::error!(
95 "Supergraph schema at path '{}' does not exist.",
96 path.to_string_lossy()
97 );
98 stream::empty().boxed()
99 } else {
100 match std::fs::read_to_string(&path) {
102 Ok(schema) => {
103 if watch {
104 crate::files::watch(&path)
105 .filter_map(move |_| {
106 let path = path.clone();
107 async move {
108 match tokio::fs::read_to_string(&path).await {
109 Ok(schema) => {
110 let update_schema = UpdateSchema(SchemaState {
111 sdl: schema,
112 launch_id: None,
113 });
114 Some(update_schema)
115 }
116 Err(err) => {
117 tracing::error!(reason = %err, "failed to read supergraph schema");
118 None
119 }
120 }
121 }
122 })
123 .boxed()
124 } else {
125 let update_schema = UpdateSchema(SchemaState {
126 sdl: schema,
127 launch_id: None,
128 });
129 stream::once(future::ready(update_schema)).boxed()
130 }
131 }
132 Err(err) => {
133 tracing::error!(reason = %err, "failed to read supergraph schema");
134 stream::empty().boxed()
135 }
136 }
137 }
138 }
139 SchemaSource::Registry(uplink_config) => {
140 stream_from_uplink::<SupergraphSdlQuery, SchemaState>(uplink_config)
141 .filter_map(|res| {
142 future::ready(match res {
143 Ok(schema) => {
144 let update_schema = UpdateSchema(schema);
145 Some(update_schema)
146 }
147 Err(e) => {
148 tracing::error!("{}", e);
149 None
150 }
151 })
152 })
153 .boxed()
154 }
155 SchemaSource::URLs { urls } => {
156 futures::stream::once(async move {
157 fetch_supergraph_from_first_viable_url(&urls).await
158 })
159 .filter_map(|s| async move { s.map(Event::UpdateSchema) })
160 .boxed()
161 }
162 SchemaSource::OCI(oci_config) => {
163 tracing::debug!("using oci as schema source");
164 futures::stream::once(async move {
165 match fetch_oci(oci_config).await {
166 Ok(oci_result) => {
167 tracing::debug!("fetched schema from oci registry");
168 Some(SchemaState {
169 sdl: oci_result.schema,
170 launch_id: None,
171 })
172 }
173 Err(err) => {
174 tracing::error!("error fetching schema from oci registry {}", err);
175 None
176 }
177 }
178 })
179 .filter_map(|s| async move { s.map(Event::UpdateSchema) })
180 .boxed()
181 }
182 }
183 .chain(stream::iter(vec![NoMoreSchema]))
184 .boxed()
185 }
186}
187
188async fn fetch_supergraph_from_first_viable_url(urls: &[Url]) -> Option<SchemaState> {
191 let Ok(client) = reqwest::Client::builder()
192 .no_gzip()
193 .timeout(Duration::from_secs(10))
194 .build()
195 else {
196 tracing::error!("failed to create HTTP client to fetch supergraph schema");
197 return None;
198 };
199 for url in urls {
200 match client
201 .get(reqwest::Url::parse(url.as_ref()).unwrap())
202 .send()
203 .await
204 {
205 Ok(res) if res.status().is_success() => match res.text().await {
206 Ok(schema) => {
207 return Some(SchemaState {
208 sdl: schema,
209 launch_id: None,
210 });
211 }
212 Err(err) => {
213 tracing::warn!(
214 url.full = %url,
215 reason = %err,
216 "failed to fetch supergraph schema"
217 )
218 }
219 },
220 Ok(res) => tracing::warn!(
221 http.response.status_code = res.status().as_u16(),
222 url.full = %url,
223 "failed to fetch supergraph schema"
224 ),
225 Err(err) => tracing::warn!(
226 url.full = %url,
227 reason = %err,
228 "failed to fetch supergraph schema"
229 ),
230 }
231 }
232 tracing::error!("failed to fetch supergraph schema from all urls");
233 None
234}
235
236#[cfg(test)]
237mod tests {
238 use std::env::temp_dir;
239
240 use test_log::test;
241 use tracing_futures::WithSubscriber;
242 use wiremock::Mock;
243 use wiremock::MockServer;
244 use wiremock::ResponseTemplate;
245 use wiremock::matchers::method;
246 use wiremock::matchers::path;
247
248 use super::*;
249 use crate::assert_snapshot_subscriber;
250 use crate::files::tests::create_temp_file;
251 use crate::files::tests::write_and_flush;
252
253 #[test(tokio::test)]
254 async fn schema_by_file_watching() {
255 let (path, mut file) = create_temp_file();
256 let schema = include_str!("../../testdata/supergraph.graphql");
257 write_and_flush(&mut file, schema).await;
258 let mut stream = SchemaSource::File { path, watch: true }
259 .into_stream()
260 .boxed();
261
262 assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
264
265 let schema_minimal = include_str!("../../testdata/minimal_supergraph.graphql");
267 write_and_flush(&mut file, schema_minimal).await;
269 assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
270 }
271
272 #[test(tokio::test)]
273 async fn schema_by_file_no_watch() {
274 let (path, mut file) = create_temp_file();
275 let schema = include_str!("../../testdata/supergraph.graphql");
276 write_and_flush(&mut file, schema).await;
277
278 let mut stream = SchemaSource::File { path, watch: false }.into_stream();
279 assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
280 assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
281 }
282
283 #[test(tokio::test)]
284 async fn schema_by_file_missing() {
285 let mut stream = SchemaSource::File {
286 path: temp_dir().join("does_not_exist"),
287 watch: true,
288 }
289 .into_stream();
290
291 assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
293 }
294
295 const SCHEMA_1: &str = "schema1";
296 const SCHEMA_2: &str = "schema2";
297 #[test(tokio::test)]
298 async fn schema_by_url() {
299 async {
300 let mock_server = MockServer::start().await;
301 Mock::given(method("GET"))
302 .and(path("/schema1"))
303 .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_1))
304 .mount(&mock_server)
305 .await;
306 Mock::given(method("GET"))
307 .and(path("/schema2"))
308 .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_2))
309 .mount(&mock_server)
310 .await;
311
312 let mut stream = SchemaSource::URLs {
313 urls: vec![
314 Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
315 Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
316 ],
317 }
318 .into_stream();
319
320 assert!(
321 matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1)
322 );
323 assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
324 }
325 .with_subscriber(assert_snapshot_subscriber!())
326 .await;
327 }
328
329 #[test(tokio::test)]
330 async fn schema_by_url_fallback() {
331 async {
332 let mock_server = MockServer::start().await;
333 Mock::given(method("GET"))
334 .and(path("/schema1"))
335 .respond_with(ResponseTemplate::new(400))
336 .mount(&mock_server)
337 .await;
338 Mock::given(method("GET"))
339 .and(path("/schema2"))
340 .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_2))
341 .mount(&mock_server)
342 .await;
343
344 let mut stream = SchemaSource::URLs {
345 urls: vec![
346 Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
347 Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
348 ],
349 }
350 .into_stream();
351
352 assert!(
353 matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_2)
354 );
355 assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
356 }
357 .with_subscriber(assert_snapshot_subscriber!({
358 "[].fields[\"url.full\"]" => "[url.full]"
359 }))
360 .await;
361 }
362
363 #[test(tokio::test)]
364 async fn schema_by_url_all_fail() {
365 async {
366 let mock_server = MockServer::start().await;
367 Mock::given(method("GET"))
368 .and(path("/schema1"))
369 .respond_with(ResponseTemplate::new(400))
370 .mount(&mock_server)
371 .await;
372 Mock::given(method("GET"))
373 .and(path("/schema2"))
374 .respond_with(ResponseTemplate::new(400))
375 .mount(&mock_server)
376 .await;
377
378 let mut stream = SchemaSource::URLs {
379 urls: vec![
380 Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
381 Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
382 ],
383 }
384 .into_stream();
385
386 assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
387 }
388 .with_subscriber(assert_snapshot_subscriber!({
389 "[].fields[\"url.full\"]" => "[url.full]"
390 }))
391 .await;
392 }
393}