apollo_router/router/event/
schema.rs

1use std::path::PathBuf;
2use std::pin::Pin;
3use std::time::Duration;
4
5use derivative::Derivative;
6use derive_more::Display;
7use derive_more::From;
8use futures::prelude::*;
9use url::Url;
10
11use crate::registry::OciConfig;
12use crate::registry::fetch_oci;
13use crate::router::Event;
14use crate::router::Event::NoMoreSchema;
15use crate::router::Event::UpdateSchema;
16use crate::uplink::UplinkConfig;
17use crate::uplink::schema::SchemaState;
18use crate::uplink::schema_stream::SupergraphSdlQuery;
19use crate::uplink::stream_from_uplink;
20
21type SchemaStream = Pin<Box<dyn Stream<Item = String> + Send>>;
22
23/// The user supplied schema. Either a static string or a stream for hot reloading.
24#[derive(From, Display, Derivative)]
25#[derivative(Debug)]
26#[non_exhaustive]
27pub enum SchemaSource {
28    /// A static schema.
29    #[display("String")]
30    Static { schema_sdl: String },
31
32    /// A stream of schema.
33    #[display("Stream")]
34    Stream(#[derivative(Debug = "ignore")] SchemaStream),
35
36    /// A YAML file that may be watched for changes.
37    #[display("File")]
38    File {
39        /// The path of the schema file.
40        path: PathBuf,
41
42        /// `true` to watch the file for changes and hot apply them.
43        watch: bool,
44    },
45
46    /// Apollo managed federation.
47    #[display("Registry")]
48    Registry(UplinkConfig),
49
50    /// A list of URLs to fetch the schema from.
51    #[display("URLs")]
52    URLs {
53        /// The URLs to fetch the schema from.
54        urls: Vec<Url>,
55    },
56
57    #[display("Registry")]
58    OCI(OciConfig),
59}
60
61impl From<&'_ str> for SchemaSource {
62    fn from(s: &'_ str) -> Self {
63        Self::Static {
64            schema_sdl: s.to_owned(),
65        }
66    }
67}
68
69impl SchemaSource {
70    /// Convert this schema into a stream regardless of if is static or not. Allows for unified handling later.
71    pub(crate) fn into_stream(self) -> impl Stream<Item = Event> {
72        match self {
73            SchemaSource::Static { schema_sdl: schema } => {
74                let update_schema = UpdateSchema(SchemaState {
75                    sdl: schema,
76                    launch_id: None,
77                });
78                stream::once(future::ready(update_schema)).boxed()
79            }
80            SchemaSource::Stream(stream) => stream
81                .map(|sdl| {
82                    UpdateSchema(SchemaState {
83                        sdl,
84                        launch_id: None,
85                    })
86                })
87                .boxed(),
88            SchemaSource::File {
89                path,
90                watch,
91            } => {
92                // Sanity check, does the schema file exists, if it doesn't then bail.
93                if !path.exists() {
94                    tracing::error!(
95                        "Supergraph schema at path '{}' does not exist.",
96                        path.to_string_lossy()
97                    );
98                    stream::empty().boxed()
99                } else {
100                    //The schema file exists try and load it
101                    match std::fs::read_to_string(&path) {
102                        Ok(schema) => {
103                            if watch {
104                                crate::files::watch(&path)
105                                    .filter_map(move |_| {
106                                        let path = path.clone();
107                                        async move {
108                                            match tokio::fs::read_to_string(&path).await {
109                                                Ok(schema) => {
110                                                    let update_schema = UpdateSchema(SchemaState {
111                                                        sdl: schema,
112                                                        launch_id: None,
113                                                    });
114                                                    Some(update_schema)
115                                                }
116                                                Err(err) => {
117                                                    tracing::error!(reason = %err, "failed to read supergraph schema");
118                                                    None
119                                                }
120                                            }
121                                        }
122                                    })
123                                    .boxed()
124                            } else {
125                                let update_schema = UpdateSchema(SchemaState {
126                                    sdl: schema,
127                                    launch_id: None,
128                                });
129                                stream::once(future::ready(update_schema)).boxed()
130                            }
131                        }
132                        Err(err) => {
133                            tracing::error!(reason = %err, "failed to read supergraph schema");
134                            stream::empty().boxed()
135                        }
136                    }
137                }
138            }
139            SchemaSource::Registry(uplink_config) => {
140                stream_from_uplink::<SupergraphSdlQuery, SchemaState>(uplink_config)
141                    .filter_map(|res| {
142                        future::ready(match res {
143                            Ok(schema) => {
144                                let update_schema = UpdateSchema(schema);
145                                Some(update_schema)
146                            }
147                            Err(e) => {
148                                tracing::error!("{}", e);
149                                None
150                            }
151                        })
152                    })
153                    .boxed()
154            }
155            SchemaSource::URLs { urls } => {
156                futures::stream::once(async move {
157                    fetch_supergraph_from_first_viable_url(&urls).await
158                })
159                .filter_map(|s| async move { s.map(Event::UpdateSchema) })
160                .boxed()
161            }
162            SchemaSource::OCI(oci_config) => {
163                tracing::debug!("using oci as schema source");
164                futures::stream::once(async move {
165                    match fetch_oci(oci_config).await {
166                        Ok(oci_result) => {
167                            tracing::debug!("fetched schema from oci registry");
168                            Some(SchemaState {
169                                sdl: oci_result.schema,
170                                launch_id: None,
171                            })
172                        }
173                        Err(err) => {
174                            tracing::error!("error fetching schema from oci registry {}", err);
175                            None
176                        }
177                    }
178                })
179                    .filter_map(|s| async move { s.map(Event::UpdateSchema) })
180                    .boxed()
181            }
182        }
183        .chain(stream::iter(vec![NoMoreSchema]))
184        .boxed()
185    }
186}
187
188// Encapsulates fetching the schema from the first viable url.
189// It will try each url in order until it finds one that works.
190async fn fetch_supergraph_from_first_viable_url(urls: &[Url]) -> Option<SchemaState> {
191    let Ok(client) = reqwest::Client::builder()
192        .no_gzip()
193        .timeout(Duration::from_secs(10))
194        .build()
195    else {
196        tracing::error!("failed to create HTTP client to fetch supergraph schema");
197        return None;
198    };
199    for url in urls {
200        match client
201            .get(reqwest::Url::parse(url.as_ref()).unwrap())
202            .send()
203            .await
204        {
205            Ok(res) if res.status().is_success() => match res.text().await {
206                Ok(schema) => {
207                    return Some(SchemaState {
208                        sdl: schema,
209                        launch_id: None,
210                    });
211                }
212                Err(err) => {
213                    tracing::warn!(
214                        url.full = %url,
215                        reason = %err,
216                        "failed to fetch supergraph schema"
217                    )
218                }
219            },
220            Ok(res) => tracing::warn!(
221                http.response.status_code = res.status().as_u16(),
222                url.full = %url,
223                "failed to fetch supergraph schema"
224            ),
225            Err(err) => tracing::warn!(
226                url.full = %url,
227                reason = %err,
228                "failed to fetch supergraph schema"
229            ),
230        }
231    }
232    tracing::error!("failed to fetch supergraph schema from all urls");
233    None
234}
235
236#[cfg(test)]
237mod tests {
238    use std::env::temp_dir;
239
240    use test_log::test;
241    use tracing_futures::WithSubscriber;
242    use wiremock::Mock;
243    use wiremock::MockServer;
244    use wiremock::ResponseTemplate;
245    use wiremock::matchers::method;
246    use wiremock::matchers::path;
247
248    use super::*;
249    use crate::assert_snapshot_subscriber;
250    use crate::files::tests::create_temp_file;
251    use crate::files::tests::write_and_flush;
252
253    #[test(tokio::test)]
254    async fn schema_by_file_watching() {
255        let (path, mut file) = create_temp_file();
256        let schema = include_str!("../../testdata/supergraph.graphql");
257        write_and_flush(&mut file, schema).await;
258        let mut stream = SchemaSource::File { path, watch: true }
259            .into_stream()
260            .boxed();
261
262        // First update is guaranteed
263        assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
264
265        // Need different contents, since we won't get an event if content is the same
266        let schema_minimal = include_str!("../../testdata/minimal_supergraph.graphql");
267        // Modify the file and try again
268        write_and_flush(&mut file, schema_minimal).await;
269        assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
270    }
271
272    #[test(tokio::test)]
273    async fn schema_by_file_no_watch() {
274        let (path, mut file) = create_temp_file();
275        let schema = include_str!("../../testdata/supergraph.graphql");
276        write_and_flush(&mut file, schema).await;
277
278        let mut stream = SchemaSource::File { path, watch: false }.into_stream();
279        assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
280        assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
281    }
282
283    #[test(tokio::test)]
284    async fn schema_by_file_missing() {
285        let mut stream = SchemaSource::File {
286            path: temp_dir().join("does_not_exist"),
287            watch: true,
288        }
289        .into_stream();
290
291        // First update fails because the file is invalid.
292        assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
293    }
294
295    const SCHEMA_1: &str = "schema1";
296    const SCHEMA_2: &str = "schema2";
297    #[test(tokio::test)]
298    async fn schema_by_url() {
299        async {
300            let mock_server = MockServer::start().await;
301            Mock::given(method("GET"))
302                .and(path("/schema1"))
303                .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_1))
304                .mount(&mock_server)
305                .await;
306            Mock::given(method("GET"))
307                .and(path("/schema2"))
308                .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_2))
309                .mount(&mock_server)
310                .await;
311
312            let mut stream = SchemaSource::URLs {
313                urls: vec![
314                    Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
315                    Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
316                ],
317            }
318            .into_stream();
319
320            assert!(
321                matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1)
322            );
323            assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
324        }
325        .with_subscriber(assert_snapshot_subscriber!())
326        .await;
327    }
328
329    #[test(tokio::test)]
330    async fn schema_by_url_fallback() {
331        async {
332            let mock_server = MockServer::start().await;
333            Mock::given(method("GET"))
334                .and(path("/schema1"))
335                .respond_with(ResponseTemplate::new(400))
336                .mount(&mock_server)
337                .await;
338            Mock::given(method("GET"))
339                .and(path("/schema2"))
340                .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_2))
341                .mount(&mock_server)
342                .await;
343
344            let mut stream = SchemaSource::URLs {
345                urls: vec![
346                    Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
347                    Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
348                ],
349            }
350            .into_stream();
351
352            assert!(
353                matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_2)
354            );
355            assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
356        }
357        .with_subscriber(assert_snapshot_subscriber!({
358            "[].fields[\"url.full\"]" => "[url.full]"
359        }))
360        .await;
361    }
362
363    #[test(tokio::test)]
364    async fn schema_by_url_all_fail() {
365        async {
366            let mock_server = MockServer::start().await;
367            Mock::given(method("GET"))
368                .and(path("/schema1"))
369                .respond_with(ResponseTemplate::new(400))
370                .mount(&mock_server)
371                .await;
372            Mock::given(method("GET"))
373                .and(path("/schema2"))
374                .respond_with(ResponseTemplate::new(400))
375                .mount(&mock_server)
376                .await;
377
378            let mut stream = SchemaSource::URLs {
379                urls: vec![
380                    Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
381                    Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
382                ],
383            }
384            .into_stream();
385
386            assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
387        }
388        .with_subscriber(assert_snapshot_subscriber!({
389            "[].fields[\"url.full\"]" => "[url.full]"
390        }))
391        .await;
392    }
393}