Skip to main content

apollo_router/router/event/
schema.rs

1use std::path::PathBuf;
2use std::pin::Pin;
3use std::time::Duration;
4
5use derivative::Derivative;
6use derive_more::Display;
7use derive_more::From;
8use futures::prelude::*;
9use url::Url;
10
11use crate::router::Event;
12use crate::router::Event::NoMoreSchema;
13use crate::router::Event::UpdateSchema;
14use crate::uplink::UplinkConfig;
15use crate::uplink::schema::SchemaState;
16use crate::uplink::schema_stream::SupergraphSdlQuery;
17use crate::uplink::stream_from_uplink;
18
19type SchemaStream = Pin<Box<dyn Stream<Item = String> + Send>>;
20
21/// The user supplied schema. Either a static string or a stream for hot reloading.
22#[derive(From, Display, Derivative)]
23#[derivative(Debug)]
24#[non_exhaustive]
25pub enum SchemaSource {
26    /// A static schema.
27    #[display(fmt = "String")]
28    Static { schema_sdl: String },
29
30    /// A stream of schema.
31    #[display(fmt = "Stream")]
32    Stream(#[derivative(Debug = "ignore")] SchemaStream),
33
34    /// A YAML file that may be watched for changes.
35    #[display(fmt = "File")]
36    File {
37        /// The path of the schema file.
38        path: PathBuf,
39
40        /// `true` to watch the file for changes and hot apply them.
41        watch: bool,
42
43        /// When watching, the delay to wait before applying the new schema.
44        /// Note: This variable is deprecated and has no effect.
45        #[deprecated]
46        delay: Option<Duration>,
47    },
48
49    /// Apollo managed federation.
50    #[display(fmt = "Registry")]
51    Registry(UplinkConfig),
52
53    /// A list of URLs to fetch the schema from.
54    #[display(fmt = "URLs")]
55    URLs {
56        /// The URLs to fetch the schema from.
57        urls: Vec<Url>,
58        /// `true` to watch the URLs for changes and hot apply them.
59        watch: bool,
60        /// When watching, the delay to wait between each poll.
61        period: Duration,
62    },
63}
64
65impl From<&'_ str> for SchemaSource {
66    fn from(s: &'_ str) -> Self {
67        Self::Static {
68            schema_sdl: s.to_owned(),
69        }
70    }
71}
72
73impl SchemaSource {
74    /// Convert this schema into a stream regardless of if is static or not. Allows for unified handling later.
75    pub(crate) fn into_stream(self) -> impl Stream<Item = Event> {
76        match self {
77            SchemaSource::Static { schema_sdl: schema } => {
78                let update_schema = UpdateSchema(SchemaState {
79                    sdl: schema,
80                    launch_id: None,
81                });
82                stream::once(future::ready(update_schema)).boxed()
83            }
84            SchemaSource::Stream(stream) => stream
85                .map(|sdl| {
86                    UpdateSchema(SchemaState {
87                        sdl,
88                        launch_id: None,
89                    })
90                })
91                .boxed(),
92            #[allow(deprecated)]
93            SchemaSource::File {
94                path,
95                watch,
96                delay: _,
97            } => {
98                // Sanity check, does the schema file exists, if it doesn't then bail.
99                if !path.exists() {
100                    tracing::error!(
101                        "Supergraph schema at path '{}' does not exist.",
102                        path.to_string_lossy()
103                    );
104                    stream::empty().boxed()
105                } else {
106                    //The schema file exists try and load it
107                    match std::fs::read_to_string(&path) {
108                        Ok(schema) => {
109                            if watch {
110                                crate::files::watch(&path)
111                                    .filter_map(move |_| {
112                                        let path = path.clone();
113                                        async move {
114                                            match tokio::fs::read_to_string(&path).await {
115                                                Ok(schema) => {
116                                                    let update_schema = UpdateSchema(SchemaState {
117                                                        sdl: schema,
118                                                        launch_id: None,
119                                                    });
120                                                    Some(update_schema)
121                                                }
122                                                Err(err) => {
123                                                    tracing::error!(reason = %err, "failed to read supergraph schema");
124                                                    None
125                                                }
126                                            }
127                                        }
128                                    })
129                                    .boxed()
130                            } else {
131                                let update_schema = UpdateSchema(SchemaState {
132                                    sdl: schema,
133                                    launch_id: None,
134                                });
135                                stream::once(future::ready(update_schema)).boxed()
136                            }
137                        }
138                        Err(err) => {
139                            tracing::error!(reason = %err, "failed to read supergraph schema");
140                            stream::empty().boxed()
141                        }
142                    }
143                }
144            }
145            SchemaSource::Registry(uplink_config) => {
146                stream_from_uplink::<SupergraphSdlQuery, SchemaState>(uplink_config)
147                    .filter_map(|res| {
148                        future::ready(match res {
149                            Ok(schema) => {
150                                let update_schema = UpdateSchema(schema);
151                                Some(update_schema)
152                            }
153                            Err(e) => {
154                                tracing::error!("{}", e);
155                                None
156                            }
157                        })
158                    })
159                    .boxed()
160            }
161            SchemaSource::URLs {
162                urls,
163                watch,
164                period,
165            } => {
166                let mut fetcher = match Fetcher::new(urls, period) {
167                    Ok(fetcher) => fetcher,
168                    Err(err) => {
169                        tracing::error!(reason = %err, "failed to fetch supergraph schema");
170                        return stream::empty().boxed();
171                    }
172                };
173
174                if watch {
175                    stream::unfold(fetcher, |mut state| async move {
176                        if state.first_call {
177                            // First call we may terminate the stream if there are no viable urls, None may be returned
178                            state
179                                .fetch_supergraph_from_first_viable_url()
180                                .await
181                                .map(|event| (Some(event), state))
182                        } else {
183                            // Subsequent calls we don't want to terminate the stream, so we always return Some
184                            Some(match state.fetch_supergraph_from_first_viable_url().await {
185                                None => (None, state),
186                                Some(event) => (Some(event), state),
187                            })
188                        }
189                    })
190                    .filter_map(|s| async move { s })
191                    .boxed()
192                } else {
193                    futures::stream::once(async move {
194                        fetcher.fetch_supergraph_from_first_viable_url().await
195                    })
196                    .filter_map(|s| async move { s })
197                    .boxed()
198                }
199            }
200        }
201        .chain(stream::iter(vec![NoMoreSchema]))
202        .boxed()
203    }
204}
205
206#[derive(thiserror::Error, Debug)]
207enum FetcherError {
208    #[error("failed to build http client")]
209    InitializationError(#[from] reqwest::Error),
210}
211
212// Encapsulates fetching the schema from the first viable url.
213// It will try each url in order until it finds one that works.
214// On the second and subsequent calls it will wait for the period before making the call.
215struct Fetcher {
216    client: reqwest::Client,
217    urls: Vec<Url>,
218    period: Duration,
219    first_call: bool,
220}
221
222impl Fetcher {
223    fn new(urls: Vec<Url>, period: Duration) -> Result<Self, FetcherError> {
224        Ok(Self {
225            client: reqwest::Client::builder()
226                .no_gzip()
227                .timeout(Duration::from_secs(10))
228                .build()
229                .map_err(FetcherError::InitializationError)?,
230            urls,
231            period,
232            first_call: true,
233        })
234    }
235    async fn fetch_supergraph_from_first_viable_url(&mut self) -> Option<Event> {
236        // If this is not the first call then we need to wait for the period before trying again.
237        if !self.first_call {
238            tokio::time::sleep(self.period).await;
239        }
240        self.first_call = false;
241
242        for url in &self.urls {
243            match self
244                .client
245                .get(reqwest::Url::parse(url.as_ref()).unwrap())
246                .send()
247                .await
248            {
249                Ok(res) if res.status().is_success() => match res.text().await {
250                    Ok(schema) => {
251                        let update_schema = UpdateSchema(SchemaState {
252                            sdl: schema,
253                            launch_id: None,
254                        });
255                        return Some(update_schema);
256                    }
257                    Err(err) => {
258                        tracing::warn!(
259                            url.full = %url,
260                            reason = %err,
261                            "failed to fetch supergraph schema"
262                        )
263                    }
264                },
265                Ok(res) => tracing::warn!(
266                    http.response.status_code = res.status().as_u16(),
267                    url.full = %url,
268                    "failed to fetch supergraph schema"
269                ),
270                Err(err) => tracing::warn!(
271                    url.full = %url,
272                    reason = %err,
273                    "failed to fetch supergraph schema"
274                ),
275            }
276        }
277        tracing::error!("failed to fetch supergraph schema from all urls");
278        None
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use std::env::temp_dir;
285
286    use futures::select;
287    use test_log::test;
288    use tracing_futures::WithSubscriber;
289    use wiremock::Mock;
290    use wiremock::MockServer;
291    use wiremock::ResponseTemplate;
292    use wiremock::matchers::method;
293    use wiremock::matchers::path;
294
295    use super::*;
296    use crate::assert_snapshot_subscriber;
297    use crate::files::tests::create_temp_file;
298    use crate::files::tests::write_and_flush;
299
300    #[test(tokio::test)]
301    async fn schema_by_file_watching() {
302        let (path, mut file) = create_temp_file();
303        let schema = include_str!("../../testdata/supergraph.graphql");
304        write_and_flush(&mut file, schema).await;
305        let mut stream = SchemaSource::File {
306            path,
307            watch: true,
308            delay: None,
309        }
310        .into_stream()
311        .boxed();
312
313        // First update is guaranteed
314        assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
315
316        // Need different contents, since we won't get an event if content is the same
317        let schema_minimal = include_str!("../../testdata/minimal_supergraph.graphql");
318        // Modify the file and try again
319        write_and_flush(&mut file, schema_minimal).await;
320        assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
321    }
322
323    #[test(tokio::test)]
324    async fn schema_by_file_no_watch() {
325        let (path, mut file) = create_temp_file();
326        let schema = include_str!("../../testdata/supergraph.graphql");
327        write_and_flush(&mut file, schema).await;
328
329        let mut stream = SchemaSource::File {
330            path,
331            watch: false,
332            delay: None,
333        }
334        .into_stream();
335        assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
336        assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
337    }
338
339    #[test(tokio::test)]
340    async fn schema_by_file_missing() {
341        let mut stream = SchemaSource::File {
342            path: temp_dir().join("does_not_exist"),
343            watch: true,
344            delay: None,
345        }
346        .into_stream();
347
348        // First update fails because the file is invalid.
349        assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
350    }
351
352    const SCHEMA_1: &str = "schema1";
353    const SCHEMA_2: &str = "schema2";
354    #[test(tokio::test)]
355    async fn schema_by_url() {
356        async {
357            let mock_server = MockServer::start().await;
358            Mock::given(method("GET"))
359                .and(path("/schema1"))
360                .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_1))
361                .mount(&mock_server)
362                .await;
363            Mock::given(method("GET"))
364                .and(path("/schema2"))
365                .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_2))
366                .mount(&mock_server)
367                .await;
368
369            let mut stream = SchemaSource::URLs {
370                urls: vec![
371                    Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
372                    Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
373                ],
374                watch: true,
375                period: Duration::from_secs(1),
376            }
377            .into_stream();
378
379            assert!(
380                matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1)
381            );
382            assert!(
383                matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1)
384            );
385        }
386        .with_subscriber(assert_snapshot_subscriber!())
387        .await;
388    }
389
390    #[test(tokio::test)]
391    async fn schema_by_url_fallback() {
392        async {
393            let mock_server = MockServer::start().await;
394            Mock::given(method("GET"))
395                .and(path("/schema1"))
396                .respond_with(ResponseTemplate::new(400))
397                .mount(&mock_server)
398                .await;
399            Mock::given(method("GET"))
400                .and(path("/schema2"))
401                .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_2))
402                .mount(&mock_server)
403                .await;
404
405            let mut stream = SchemaSource::URLs {
406                urls: vec![
407                    Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
408                    Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
409                ],
410                watch: true,
411                period: Duration::from_secs(1),
412            }
413            .into_stream();
414
415            assert!(
416                matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_2)
417            );
418            assert!(
419                matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_2)
420            );
421        }
422        .with_subscriber(assert_snapshot_subscriber!({
423            "[].fields[\"url.full\"]" => "[url.full]"
424        }))
425        .await;
426    }
427
428    #[test(tokio::test)]
429    async fn schema_by_url_all_fail() {
430        async {
431            let mock_server = MockServer::start().await;
432            Mock::given(method("GET"))
433                .and(path("/schema1"))
434                .respond_with(ResponseTemplate::new(400))
435                .mount(&mock_server)
436                .await;
437            Mock::given(method("GET"))
438                .and(path("/schema2"))
439                .respond_with(ResponseTemplate::new(400))
440                .mount(&mock_server)
441                .await;
442
443            let mut stream = SchemaSource::URLs {
444                urls: vec![
445                    Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
446                    Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
447                ],
448                watch: true,
449                period: Duration::from_secs(1),
450            }
451            .into_stream();
452
453            assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
454        }
455        .with_subscriber(assert_snapshot_subscriber!({
456            "[].fields[\"url.full\"]" => "[url.full]"
457        }))
458        .await;
459    }
460    #[test(tokio::test)]
461    async fn schema_success_fail_success() {
462        async {
463            let mock_server = MockServer::start().await;
464            let mut stream = SchemaSource::URLs {
465                urls: vec![
466                    Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
467                ],
468                watch: true,
469                period: Duration::from_secs(1),
470            }
471            .into_stream()
472            .boxed()
473            .fuse();
474
475            let success = Mock::given(method("GET"))
476                .and(path("/schema1"))
477                .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_1))
478                .mount_as_scoped(&mock_server)
479                .await;
480
481            assert!(
482                matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1)
483            );
484
485            drop(success);
486
487            // Next call will timeout
488            assert!(select! {
489                _res = stream.next() => false,
490                _res = tokio::time::sleep(Duration::from_secs(2)).boxed().fuse() => true,
491
492            });
493
494            // Now we should get the schema again if the endpoint is back
495            Mock::given(method("GET"))
496                .and(path("/schema1"))
497                .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_1))
498                .mount(&mock_server)
499                .await;
500
501            assert!(
502                matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1)
503            );
504        }
505        .with_subscriber(assert_snapshot_subscriber!({
506            "[].fields[\"url.full\"]" => "[url.full]"
507        }))
508        .await;
509    }
510
511    #[test(tokio::test)]
512    async fn schema_no_watch() {
513        async {
514            let mock_server = MockServer::start().await;
515            Mock::given(method("GET"))
516                .and(path("/schema1"))
517                .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_1))
518                .mount(&mock_server)
519                .await;
520
521            let mut stream = SchemaSource::URLs {
522                urls: vec![
523                    Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
524                ],
525                watch: false,
526                period: Duration::from_secs(1),
527            }
528            .into_stream();
529
530            assert!(
531                matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1)
532            );
533            assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
534        }
535        .with_subscriber(assert_snapshot_subscriber!())
536        .await;
537    }
538}