apollo_router/router/event/
schema.rs1use std::path::PathBuf;
2use std::pin::Pin;
3use std::time::Duration;
4
5use derivative::Derivative;
6use derive_more::Display;
7use derive_more::From;
8use futures::prelude::*;
9use url::Url;
10
11use crate::router::Event;
12use crate::router::Event::NoMoreSchema;
13use crate::router::Event::UpdateSchema;
14use crate::uplink::UplinkConfig;
15use crate::uplink::schema::SchemaState;
16use crate::uplink::schema_stream::SupergraphSdlQuery;
17use crate::uplink::stream_from_uplink;
18
19type SchemaStream = Pin<Box<dyn Stream<Item = String> + Send>>;
20
21#[derive(From, Display, Derivative)]
23#[derivative(Debug)]
24#[non_exhaustive]
25pub enum SchemaSource {
26 #[display(fmt = "String")]
28 Static { schema_sdl: String },
29
30 #[display(fmt = "Stream")]
32 Stream(#[derivative(Debug = "ignore")] SchemaStream),
33
34 #[display(fmt = "File")]
36 File {
37 path: PathBuf,
39
40 watch: bool,
42
43 #[deprecated]
46 delay: Option<Duration>,
47 },
48
49 #[display(fmt = "Registry")]
51 Registry(UplinkConfig),
52
53 #[display(fmt = "URLs")]
55 URLs {
56 urls: Vec<Url>,
58 watch: bool,
60 period: Duration,
62 },
63}
64
65impl From<&'_ str> for SchemaSource {
66 fn from(s: &'_ str) -> Self {
67 Self::Static {
68 schema_sdl: s.to_owned(),
69 }
70 }
71}
72
73impl SchemaSource {
74 pub(crate) fn into_stream(self) -> impl Stream<Item = Event> {
76 match self {
77 SchemaSource::Static { schema_sdl: schema } => {
78 let update_schema = UpdateSchema(SchemaState {
79 sdl: schema,
80 launch_id: None,
81 });
82 stream::once(future::ready(update_schema)).boxed()
83 }
84 SchemaSource::Stream(stream) => stream
85 .map(|sdl| {
86 UpdateSchema(SchemaState {
87 sdl,
88 launch_id: None,
89 })
90 })
91 .boxed(),
92 #[allow(deprecated)]
93 SchemaSource::File {
94 path,
95 watch,
96 delay: _,
97 } => {
98 if !path.exists() {
100 tracing::error!(
101 "Supergraph schema at path '{}' does not exist.",
102 path.to_string_lossy()
103 );
104 stream::empty().boxed()
105 } else {
106 match std::fs::read_to_string(&path) {
108 Ok(schema) => {
109 if watch {
110 crate::files::watch(&path)
111 .filter_map(move |_| {
112 let path = path.clone();
113 async move {
114 match tokio::fs::read_to_string(&path).await {
115 Ok(schema) => {
116 let update_schema = UpdateSchema(SchemaState {
117 sdl: schema,
118 launch_id: None,
119 });
120 Some(update_schema)
121 }
122 Err(err) => {
123 tracing::error!(reason = %err, "failed to read supergraph schema");
124 None
125 }
126 }
127 }
128 })
129 .boxed()
130 } else {
131 let update_schema = UpdateSchema(SchemaState {
132 sdl: schema,
133 launch_id: None,
134 });
135 stream::once(future::ready(update_schema)).boxed()
136 }
137 }
138 Err(err) => {
139 tracing::error!(reason = %err, "failed to read supergraph schema");
140 stream::empty().boxed()
141 }
142 }
143 }
144 }
145 SchemaSource::Registry(uplink_config) => {
146 stream_from_uplink::<SupergraphSdlQuery, SchemaState>(uplink_config)
147 .filter_map(|res| {
148 future::ready(match res {
149 Ok(schema) => {
150 let update_schema = UpdateSchema(schema);
151 Some(update_schema)
152 }
153 Err(e) => {
154 tracing::error!("{}", e);
155 None
156 }
157 })
158 })
159 .boxed()
160 }
161 SchemaSource::URLs {
162 urls,
163 watch,
164 period,
165 } => {
166 let mut fetcher = match Fetcher::new(urls, period) {
167 Ok(fetcher) => fetcher,
168 Err(err) => {
169 tracing::error!(reason = %err, "failed to fetch supergraph schema");
170 return stream::empty().boxed();
171 }
172 };
173
174 if watch {
175 stream::unfold(fetcher, |mut state| async move {
176 if state.first_call {
177 state
179 .fetch_supergraph_from_first_viable_url()
180 .await
181 .map(|event| (Some(event), state))
182 } else {
183 Some(match state.fetch_supergraph_from_first_viable_url().await {
185 None => (None, state),
186 Some(event) => (Some(event), state),
187 })
188 }
189 })
190 .filter_map(|s| async move { s })
191 .boxed()
192 } else {
193 futures::stream::once(async move {
194 fetcher.fetch_supergraph_from_first_viable_url().await
195 })
196 .filter_map(|s| async move { s })
197 .boxed()
198 }
199 }
200 }
201 .chain(stream::iter(vec![NoMoreSchema]))
202 .boxed()
203 }
204}
205
206#[derive(thiserror::Error, Debug)]
207enum FetcherError {
208 #[error("failed to build http client")]
209 InitializationError(#[from] reqwest::Error),
210}
211
212struct Fetcher {
216 client: reqwest::Client,
217 urls: Vec<Url>,
218 period: Duration,
219 first_call: bool,
220}
221
222impl Fetcher {
223 fn new(urls: Vec<Url>, period: Duration) -> Result<Self, FetcherError> {
224 Ok(Self {
225 client: reqwest::Client::builder()
226 .no_gzip()
227 .timeout(Duration::from_secs(10))
228 .build()
229 .map_err(FetcherError::InitializationError)?,
230 urls,
231 period,
232 first_call: true,
233 })
234 }
235 async fn fetch_supergraph_from_first_viable_url(&mut self) -> Option<Event> {
236 if !self.first_call {
238 tokio::time::sleep(self.period).await;
239 }
240 self.first_call = false;
241
242 for url in &self.urls {
243 match self
244 .client
245 .get(reqwest::Url::parse(url.as_ref()).unwrap())
246 .send()
247 .await
248 {
249 Ok(res) if res.status().is_success() => match res.text().await {
250 Ok(schema) => {
251 let update_schema = UpdateSchema(SchemaState {
252 sdl: schema,
253 launch_id: None,
254 });
255 return Some(update_schema);
256 }
257 Err(err) => {
258 tracing::warn!(
259 url.full = %url,
260 reason = %err,
261 "failed to fetch supergraph schema"
262 )
263 }
264 },
265 Ok(res) => tracing::warn!(
266 http.response.status_code = res.status().as_u16(),
267 url.full = %url,
268 "failed to fetch supergraph schema"
269 ),
270 Err(err) => tracing::warn!(
271 url.full = %url,
272 reason = %err,
273 "failed to fetch supergraph schema"
274 ),
275 }
276 }
277 tracing::error!("failed to fetch supergraph schema from all urls");
278 None
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use std::env::temp_dir;
285
286 use futures::select;
287 use test_log::test;
288 use tracing_futures::WithSubscriber;
289 use wiremock::Mock;
290 use wiremock::MockServer;
291 use wiremock::ResponseTemplate;
292 use wiremock::matchers::method;
293 use wiremock::matchers::path;
294
295 use super::*;
296 use crate::assert_snapshot_subscriber;
297 use crate::files::tests::create_temp_file;
298 use crate::files::tests::write_and_flush;
299
300 #[test(tokio::test)]
301 async fn schema_by_file_watching() {
302 let (path, mut file) = create_temp_file();
303 let schema = include_str!("../../testdata/supergraph.graphql");
304 write_and_flush(&mut file, schema).await;
305 let mut stream = SchemaSource::File {
306 path,
307 watch: true,
308 delay: None,
309 }
310 .into_stream()
311 .boxed();
312
313 assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
315
316 let schema_minimal = include_str!("../../testdata/minimal_supergraph.graphql");
318 write_and_flush(&mut file, schema_minimal).await;
320 assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
321 }
322
323 #[test(tokio::test)]
324 async fn schema_by_file_no_watch() {
325 let (path, mut file) = create_temp_file();
326 let schema = include_str!("../../testdata/supergraph.graphql");
327 write_and_flush(&mut file, schema).await;
328
329 let mut stream = SchemaSource::File {
330 path,
331 watch: false,
332 delay: None,
333 }
334 .into_stream();
335 assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
336 assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
337 }
338
339 #[test(tokio::test)]
340 async fn schema_by_file_missing() {
341 let mut stream = SchemaSource::File {
342 path: temp_dir().join("does_not_exist"),
343 watch: true,
344 delay: None,
345 }
346 .into_stream();
347
348 assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
350 }
351
352 const SCHEMA_1: &str = "schema1";
353 const SCHEMA_2: &str = "schema2";
354 #[test(tokio::test)]
355 async fn schema_by_url() {
356 async {
357 let mock_server = MockServer::start().await;
358 Mock::given(method("GET"))
359 .and(path("/schema1"))
360 .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_1))
361 .mount(&mock_server)
362 .await;
363 Mock::given(method("GET"))
364 .and(path("/schema2"))
365 .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_2))
366 .mount(&mock_server)
367 .await;
368
369 let mut stream = SchemaSource::URLs {
370 urls: vec![
371 Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
372 Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
373 ],
374 watch: true,
375 period: Duration::from_secs(1),
376 }
377 .into_stream();
378
379 assert!(
380 matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1)
381 );
382 assert!(
383 matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1)
384 );
385 }
386 .with_subscriber(assert_snapshot_subscriber!())
387 .await;
388 }
389
390 #[test(tokio::test)]
391 async fn schema_by_url_fallback() {
392 async {
393 let mock_server = MockServer::start().await;
394 Mock::given(method("GET"))
395 .and(path("/schema1"))
396 .respond_with(ResponseTemplate::new(400))
397 .mount(&mock_server)
398 .await;
399 Mock::given(method("GET"))
400 .and(path("/schema2"))
401 .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_2))
402 .mount(&mock_server)
403 .await;
404
405 let mut stream = SchemaSource::URLs {
406 urls: vec![
407 Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
408 Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
409 ],
410 watch: true,
411 period: Duration::from_secs(1),
412 }
413 .into_stream();
414
415 assert!(
416 matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_2)
417 );
418 assert!(
419 matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_2)
420 );
421 }
422 .with_subscriber(assert_snapshot_subscriber!({
423 "[].fields[\"url.full\"]" => "[url.full]"
424 }))
425 .await;
426 }
427
428 #[test(tokio::test)]
429 async fn schema_by_url_all_fail() {
430 async {
431 let mock_server = MockServer::start().await;
432 Mock::given(method("GET"))
433 .and(path("/schema1"))
434 .respond_with(ResponseTemplate::new(400))
435 .mount(&mock_server)
436 .await;
437 Mock::given(method("GET"))
438 .and(path("/schema2"))
439 .respond_with(ResponseTemplate::new(400))
440 .mount(&mock_server)
441 .await;
442
443 let mut stream = SchemaSource::URLs {
444 urls: vec![
445 Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
446 Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
447 ],
448 watch: true,
449 period: Duration::from_secs(1),
450 }
451 .into_stream();
452
453 assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
454 }
455 .with_subscriber(assert_snapshot_subscriber!({
456 "[].fields[\"url.full\"]" => "[url.full]"
457 }))
458 .await;
459 }
460 #[test(tokio::test)]
461 async fn schema_success_fail_success() {
462 async {
463 let mock_server = MockServer::start().await;
464 let mut stream = SchemaSource::URLs {
465 urls: vec![
466 Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
467 ],
468 watch: true,
469 period: Duration::from_secs(1),
470 }
471 .into_stream()
472 .boxed()
473 .fuse();
474
475 let success = Mock::given(method("GET"))
476 .and(path("/schema1"))
477 .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_1))
478 .mount_as_scoped(&mock_server)
479 .await;
480
481 assert!(
482 matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1)
483 );
484
485 drop(success);
486
487 assert!(select! {
489 _res = stream.next() => false,
490 _res = tokio::time::sleep(Duration::from_secs(2)).boxed().fuse() => true,
491
492 });
493
494 Mock::given(method("GET"))
496 .and(path("/schema1"))
497 .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_1))
498 .mount(&mock_server)
499 .await;
500
501 assert!(
502 matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1)
503 );
504 }
505 .with_subscriber(assert_snapshot_subscriber!({
506 "[].fields[\"url.full\"]" => "[url.full]"
507 }))
508 .await;
509 }
510
511 #[test(tokio::test)]
512 async fn schema_no_watch() {
513 async {
514 let mock_server = MockServer::start().await;
515 Mock::given(method("GET"))
516 .and(path("/schema1"))
517 .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_1))
518 .mount(&mock_server)
519 .await;
520
521 let mut stream = SchemaSource::URLs {
522 urls: vec![
523 Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
524 ],
525 watch: false,
526 period: Duration::from_secs(1),
527 }
528 .into_stream();
529
530 assert!(
531 matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1)
532 );
533 assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
534 }
535 .with_subscriber(assert_snapshot_subscriber!())
536 .await;
537 }
538}