Skip to main content

apollo_router/router/event/
schema.rs

1use std::path::PathBuf;
2use std::pin::Pin;
3use std::time::Duration;
4
5use derivative::Derivative;
6use derive_more::Display;
7use derive_more::From;
8use futures::prelude::*;
9use url::Url;
10
11use crate::registry::OciConfig;
12use crate::registry::create_oci_schema_stream;
13use crate::router::Event;
14use crate::router::Event::NoMoreSchema;
15use crate::router::Event::UpdateSchema;
16use crate::uplink::UplinkConfig;
17use crate::uplink::schema::SchemaState;
18use crate::uplink::schema_stream::SupergraphSdlQuery;
19use crate::uplink::stream_from_uplink;
20
21type SchemaStream = Pin<Box<dyn Stream<Item = String> + Send>>;
22
23/// The user supplied schema. Either a static string or a stream for hot reloading.
24#[derive(From, Display, Derivative)]
25#[derivative(Debug)]
26#[non_exhaustive]
27pub enum SchemaSource {
28    /// A static schema.
29    #[display("String")]
30    Static { schema_sdl: String },
31
32    /// A stream of schema.
33    #[display("Stream")]
34    Stream(#[derivative(Debug = "ignore")] SchemaStream),
35
36    /// A YAML file that may be watched for changes.
37    #[display("File")]
38    File {
39        /// The path of the schema file.
40        path: PathBuf,
41
42        /// `true` to watch the file for changes and hot apply them.
43        watch: bool,
44    },
45
46    /// Apollo managed federation.
47    #[display("Registry")]
48    Registry(UplinkConfig),
49
50    /// A list of URLs to fetch the schema from.
51    #[display("URLs")]
52    URLs {
53        /// The URLs to fetch the schema from.
54        urls: Vec<Url>,
55    },
56
57    #[display("Registry")]
58    OCI(OciConfig),
59}
60
61impl From<&'_ str> for SchemaSource {
62    fn from(s: &'_ str) -> Self {
63        Self::Static {
64            schema_sdl: s.to_owned(),
65        }
66    }
67}
68
69impl SchemaSource {
70    /// Convert this schema into a stream regardless of if is static or not. Allows for unified handling later.
71    pub(crate) fn into_stream(self) -> impl Stream<Item = Event> {
72        match self {
73            SchemaSource::Static { schema_sdl: schema } => {
74                let update_schema = UpdateSchema(SchemaState {
75                    sdl: schema,
76                    launch_id: None,
77                });
78                stream::once(future::ready(update_schema)).boxed()
79            }
80            SchemaSource::Stream(stream) => stream
81                .map(|sdl| {
82                    UpdateSchema(SchemaState {
83                        sdl,
84                        launch_id: None,
85                    })
86                })
87                .boxed(),
88            SchemaSource::File {
89                path,
90                watch,
91            } => {
92                // Sanity check, does the schema file exists, if it doesn't then bail.
93                if !path.exists() {
94                    tracing::error!(
95                        "Supergraph schema at path '{}' does not exist.",
96                        path.to_string_lossy()
97                    );
98                    stream::empty().boxed()
99                } else {
100                    //The schema file exists try and load it
101                    match std::fs::read_to_string(&path) {
102                        Ok(schema) => {
103                            if watch {
104                                crate::files::watch(&path)
105                                    .filter_map(move |_| {
106                                        let path = path.clone();
107                                        async move {
108                                            match tokio::fs::read_to_string(&path).await {
109                                                Ok(schema) => {
110                                                    let update_schema = UpdateSchema(SchemaState {
111                                                        sdl: schema,
112                                                        launch_id: None,
113                                                    });
114                                                    Some(update_schema)
115                                                }
116                                                Err(err) => {
117                                                    tracing::error!(reason = %err, "failed to read supergraph schema");
118                                                    None
119                                                }
120                                            }
121                                        }
122                                    })
123                                    .boxed()
124                            } else {
125                                let update_schema = UpdateSchema(SchemaState {
126                                    sdl: schema,
127                                    launch_id: None,
128                                });
129                                stream::once(future::ready(update_schema)).boxed()
130                            }
131                        }
132                        Err(err) => {
133                            tracing::error!(reason = %err, "failed to read supergraph schema");
134                            stream::empty().boxed()
135                        }
136                    }
137                }
138            }
139            SchemaSource::Registry(uplink_config) => {
140                stream_from_uplink::<SupergraphSdlQuery, SchemaState>(uplink_config)
141                    .filter_map(|res| {
142                        future::ready(match res {
143                            Ok(schema) => {
144                                let update_schema = UpdateSchema(schema);
145                                Some(update_schema)
146                            }
147                            Err(e) => {
148                                tracing::error!("{}", e);
149                                None
150                            }
151                        })
152                    })
153                    .boxed()
154            }
155            SchemaSource::URLs { urls } => {
156                futures::stream::once(async move {
157                    fetch_supergraph_from_first_viable_url(&urls).await
158                })
159                .filter_map(|s| async move { s.map(Event::UpdateSchema) })
160                .boxed()
161            }
162            SchemaSource::OCI(oci_config) => {
163                tracing::debug!("using oci as schema source");
164                match create_oci_schema_stream(oci_config) {
165                    Ok(stream) => Pin::new(Box::new(stream))
166                        .filter_map(|res| {
167                            future::ready(match res {
168                                Ok(schema) => {
169                                    let update_schema = UpdateSchema(schema);
170                                    Some(update_schema)
171                                }
172                                Err(e) => {
173                                    tracing::error!("{}", e);
174                                    None
175                                }
176                            })
177                        })
178                        .boxed(),
179                    Err(e) => {
180                        tracing::error!("failed to create OCI schema stream: {}", e);
181                        stream::empty().boxed()
182                    }
183                }
184            }
185        }
186        .chain(stream::iter(vec![NoMoreSchema]))
187        .boxed()
188    }
189}
190
191// Encapsulates fetching the schema from the first viable url.
192// It will try each url in order until it finds one that works.
193async fn fetch_supergraph_from_first_viable_url(urls: &[Url]) -> Option<SchemaState> {
194    let Ok(client) = reqwest::Client::builder()
195        .no_gzip()
196        .timeout(Duration::from_secs(10))
197        .build()
198    else {
199        tracing::error!("failed to create HTTP client to fetch supergraph schema");
200        return None;
201    };
202    for url in urls {
203        match client
204            .get(reqwest::Url::parse(url.as_ref()).unwrap())
205            .send()
206            .await
207        {
208            Ok(res) if res.status().is_success() => match res.text().await {
209                Ok(schema) => {
210                    return Some(SchemaState {
211                        sdl: schema,
212                        launch_id: None,
213                    });
214                }
215                Err(err) => {
216                    tracing::warn!(
217                        url.full = %url,
218                        reason = %err,
219                        "failed to fetch supergraph schema"
220                    )
221                }
222            },
223            Ok(res) => tracing::warn!(
224                http.response.status_code = res.status().as_u16(),
225                url.full = %url,
226                "failed to fetch supergraph schema"
227            ),
228            Err(err) => tracing::warn!(
229                url.full = %url,
230                reason = %err,
231                "failed to fetch supergraph schema"
232            ),
233        }
234    }
235    tracing::error!("failed to fetch supergraph schema from all urls");
236    None
237}
238
239#[cfg(test)]
240mod tests {
241    use std::env::temp_dir;
242
243    use test_log::test;
244    use tracing_futures::WithSubscriber;
245    use wiremock::Mock;
246    use wiremock::MockServer;
247    use wiremock::ResponseTemplate;
248    use wiremock::matchers::method;
249    use wiremock::matchers::path;
250
251    use super::*;
252    use crate::assert_snapshot_subscriber;
253    use crate::files::tests::create_temp_file;
254    use crate::files::tests::write_and_flush;
255
256    #[test(tokio::test)]
257    async fn schema_by_file_watching() {
258        let (path, mut file) = create_temp_file();
259        let schema = include_str!("../../testdata/supergraph.graphql");
260        write_and_flush(&mut file, schema).await;
261        let mut stream = SchemaSource::File { path, watch: true }
262            .into_stream()
263            .boxed();
264
265        // First update is guaranteed
266        assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
267
268        // Need different contents, since we won't get an event if content is the same
269        let schema_minimal = include_str!("../../testdata/minimal_supergraph.graphql");
270        // Modify the file and try again
271        write_and_flush(&mut file, schema_minimal).await;
272        assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
273    }
274
275    #[test(tokio::test)]
276    async fn schema_by_file_no_watch() {
277        let (path, mut file) = create_temp_file();
278        let schema = include_str!("../../testdata/supergraph.graphql");
279        write_and_flush(&mut file, schema).await;
280
281        let mut stream = SchemaSource::File { path, watch: false }.into_stream();
282        assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
283        assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
284    }
285
286    #[test(tokio::test)]
287    async fn schema_by_file_missing() {
288        let mut stream = SchemaSource::File {
289            path: temp_dir().join("does_not_exist"),
290            watch: true,
291        }
292        .into_stream();
293
294        // First update fails because the file is invalid.
295        assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
296    }
297
298    const SCHEMA_1: &str = "schema1";
299    const SCHEMA_2: &str = "schema2";
300    #[test(tokio::test)]
301    async fn schema_by_url() {
302        async {
303            let mock_server = MockServer::start().await;
304            Mock::given(method("GET"))
305                .and(path("/schema1"))
306                .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_1))
307                .mount(&mock_server)
308                .await;
309            Mock::given(method("GET"))
310                .and(path("/schema2"))
311                .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_2))
312                .mount(&mock_server)
313                .await;
314
315            let mut stream = SchemaSource::URLs {
316                urls: vec![
317                    Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
318                    Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
319                ],
320            }
321            .into_stream();
322
323            assert!(
324                matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1)
325            );
326            assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
327        }
328        .with_subscriber(assert_snapshot_subscriber!())
329        .await;
330    }
331
332    #[test(tokio::test)]
333    async fn schema_by_url_fallback() {
334        async {
335            let mock_server = MockServer::start().await;
336            Mock::given(method("GET"))
337                .and(path("/schema1"))
338                .respond_with(ResponseTemplate::new(400))
339                .mount(&mock_server)
340                .await;
341            Mock::given(method("GET"))
342                .and(path("/schema2"))
343                .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_2))
344                .mount(&mock_server)
345                .await;
346
347            let mut stream = SchemaSource::URLs {
348                urls: vec![
349                    Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
350                    Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
351                ],
352            }
353            .into_stream();
354
355            assert!(
356                matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_2)
357            );
358            assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
359        }
360        .with_subscriber(assert_snapshot_subscriber!({
361            "[].fields[\"url.full\"]" => "[url.full]"
362        }))
363        .await;
364    }
365
366    #[test(tokio::test)]
367    async fn schema_by_url_all_fail() {
368        async {
369            let mock_server = MockServer::start().await;
370            Mock::given(method("GET"))
371                .and(path("/schema1"))
372                .respond_with(ResponseTemplate::new(400))
373                .mount(&mock_server)
374                .await;
375            Mock::given(method("GET"))
376                .and(path("/schema2"))
377                .respond_with(ResponseTemplate::new(400))
378                .mount(&mock_server)
379                .await;
380
381            let mut stream = SchemaSource::URLs {
382                urls: vec![
383                    Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
384                    Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
385                ],
386            }
387            .into_stream();
388
389            assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
390        }
391        .with_subscriber(assert_snapshot_subscriber!({
392            "[].fields[\"url.full\"]" => "[url.full]"
393        }))
394        .await;
395    }
396}