apollo_router/router/event/
schema.rs1use std::path::PathBuf;
2use std::pin::Pin;
3use std::time::Duration;
4
5use derivative::Derivative;
6use derive_more::Display;
7use derive_more::From;
8use futures::prelude::*;
9use url::Url;
10
11use crate::registry::OciConfig;
12use crate::registry::create_oci_schema_stream;
13use crate::router::Event;
14use crate::router::Event::NoMoreSchema;
15use crate::router::Event::UpdateSchema;
16use crate::uplink::UplinkConfig;
17use crate::uplink::schema::SchemaState;
18use crate::uplink::schema_stream::SupergraphSdlQuery;
19use crate::uplink::stream_from_uplink;
20
21type SchemaStream = Pin<Box<dyn Stream<Item = String> + Send>>;
22
23#[derive(From, Display, Derivative)]
25#[derivative(Debug)]
26#[non_exhaustive]
27pub enum SchemaSource {
28 #[display("String")]
30 Static { schema_sdl: String },
31
32 #[display("Stream")]
34 Stream(#[derivative(Debug = "ignore")] SchemaStream),
35
36 #[display("File")]
38 File {
39 path: PathBuf,
41
42 watch: bool,
44 },
45
46 #[display("Registry")]
48 Registry(UplinkConfig),
49
50 #[display("URLs")]
52 URLs {
53 urls: Vec<Url>,
55 },
56
57 #[display("Registry")]
58 OCI(OciConfig),
59}
60
61impl From<&'_ str> for SchemaSource {
62 fn from(s: &'_ str) -> Self {
63 Self::Static {
64 schema_sdl: s.to_owned(),
65 }
66 }
67}
68
69impl SchemaSource {
70 pub(crate) fn into_stream(self) -> impl Stream<Item = Event> {
72 match self {
73 SchemaSource::Static { schema_sdl: schema } => {
74 let update_schema = UpdateSchema(SchemaState {
75 sdl: schema,
76 launch_id: None,
77 });
78 stream::once(future::ready(update_schema)).boxed()
79 }
80 SchemaSource::Stream(stream) => stream
81 .map(|sdl| {
82 UpdateSchema(SchemaState {
83 sdl,
84 launch_id: None,
85 })
86 })
87 .boxed(),
88 SchemaSource::File {
89 path,
90 watch,
91 } => {
92 if !path.exists() {
94 tracing::error!(
95 "Supergraph schema at path '{}' does not exist.",
96 path.to_string_lossy()
97 );
98 stream::empty().boxed()
99 } else {
100 match std::fs::read_to_string(&path) {
102 Ok(schema) => {
103 if watch {
104 crate::files::watch(&path)
105 .filter_map(move |_| {
106 let path = path.clone();
107 async move {
108 match tokio::fs::read_to_string(&path).await {
109 Ok(schema) => {
110 let update_schema = UpdateSchema(SchemaState {
111 sdl: schema,
112 launch_id: None,
113 });
114 Some(update_schema)
115 }
116 Err(err) => {
117 tracing::error!(reason = %err, "failed to read supergraph schema");
118 None
119 }
120 }
121 }
122 })
123 .boxed()
124 } else {
125 let update_schema = UpdateSchema(SchemaState {
126 sdl: schema,
127 launch_id: None,
128 });
129 stream::once(future::ready(update_schema)).boxed()
130 }
131 }
132 Err(err) => {
133 tracing::error!(reason = %err, "failed to read supergraph schema");
134 stream::empty().boxed()
135 }
136 }
137 }
138 }
139 SchemaSource::Registry(uplink_config) => {
140 stream_from_uplink::<SupergraphSdlQuery, SchemaState>(uplink_config)
141 .filter_map(|res| {
142 future::ready(match res {
143 Ok(schema) => {
144 let update_schema = UpdateSchema(schema);
145 Some(update_schema)
146 }
147 Err(e) => {
148 tracing::error!("{}", e);
149 None
150 }
151 })
152 })
153 .boxed()
154 }
155 SchemaSource::URLs { urls } => {
156 futures::stream::once(async move {
157 fetch_supergraph_from_first_viable_url(&urls).await
158 })
159 .filter_map(|s| async move { s.map(Event::UpdateSchema) })
160 .boxed()
161 }
162 SchemaSource::OCI(oci_config) => {
163 tracing::debug!("using oci as schema source");
164 match create_oci_schema_stream(oci_config) {
165 Ok(stream) => Pin::new(Box::new(stream))
166 .filter_map(|res| {
167 future::ready(match res {
168 Ok(schema) => {
169 let update_schema = UpdateSchema(schema);
170 Some(update_schema)
171 }
172 Err(e) => {
173 tracing::error!("{}", e);
174 None
175 }
176 })
177 })
178 .boxed(),
179 Err(e) => {
180 tracing::error!("failed to create OCI schema stream: {}", e);
181 stream::empty().boxed()
182 }
183 }
184 }
185 }
186 .chain(stream::iter(vec![NoMoreSchema]))
187 .boxed()
188 }
189}
190
191async fn fetch_supergraph_from_first_viable_url(urls: &[Url]) -> Option<SchemaState> {
194 let Ok(client) = reqwest::Client::builder()
195 .no_gzip()
196 .timeout(Duration::from_secs(10))
197 .build()
198 else {
199 tracing::error!("failed to create HTTP client to fetch supergraph schema");
200 return None;
201 };
202 for url in urls {
203 match client
204 .get(reqwest::Url::parse(url.as_ref()).unwrap())
205 .send()
206 .await
207 {
208 Ok(res) if res.status().is_success() => match res.text().await {
209 Ok(schema) => {
210 return Some(SchemaState {
211 sdl: schema,
212 launch_id: None,
213 });
214 }
215 Err(err) => {
216 tracing::warn!(
217 url.full = %url,
218 reason = %err,
219 "failed to fetch supergraph schema"
220 )
221 }
222 },
223 Ok(res) => tracing::warn!(
224 http.response.status_code = res.status().as_u16(),
225 url.full = %url,
226 "failed to fetch supergraph schema"
227 ),
228 Err(err) => tracing::warn!(
229 url.full = %url,
230 reason = %err,
231 "failed to fetch supergraph schema"
232 ),
233 }
234 }
235 tracing::error!("failed to fetch supergraph schema from all urls");
236 None
237}
238
239#[cfg(test)]
240mod tests {
241 use std::env::temp_dir;
242
243 use test_log::test;
244 use tracing_futures::WithSubscriber;
245 use wiremock::Mock;
246 use wiremock::MockServer;
247 use wiremock::ResponseTemplate;
248 use wiremock::matchers::method;
249 use wiremock::matchers::path;
250
251 use super::*;
252 use crate::assert_snapshot_subscriber;
253 use crate::files::tests::create_temp_file;
254 use crate::files::tests::write_and_flush;
255
256 #[test(tokio::test)]
257 async fn schema_by_file_watching() {
258 let (path, mut file) = create_temp_file();
259 let schema = include_str!("../../testdata/supergraph.graphql");
260 write_and_flush(&mut file, schema).await;
261 let mut stream = SchemaSource::File { path, watch: true }
262 .into_stream()
263 .boxed();
264
265 assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
267
268 let schema_minimal = include_str!("../../testdata/minimal_supergraph.graphql");
270 write_and_flush(&mut file, schema_minimal).await;
272 assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
273 }
274
275 #[test(tokio::test)]
276 async fn schema_by_file_no_watch() {
277 let (path, mut file) = create_temp_file();
278 let schema = include_str!("../../testdata/supergraph.graphql");
279 write_and_flush(&mut file, schema).await;
280
281 let mut stream = SchemaSource::File { path, watch: false }.into_stream();
282 assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
283 assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
284 }
285
286 #[test(tokio::test)]
287 async fn schema_by_file_missing() {
288 let mut stream = SchemaSource::File {
289 path: temp_dir().join("does_not_exist"),
290 watch: true,
291 }
292 .into_stream();
293
294 assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
296 }
297
298 const SCHEMA_1: &str = "schema1";
299 const SCHEMA_2: &str = "schema2";
300 #[test(tokio::test)]
301 async fn schema_by_url() {
302 async {
303 let mock_server = MockServer::start().await;
304 Mock::given(method("GET"))
305 .and(path("/schema1"))
306 .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_1))
307 .mount(&mock_server)
308 .await;
309 Mock::given(method("GET"))
310 .and(path("/schema2"))
311 .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_2))
312 .mount(&mock_server)
313 .await;
314
315 let mut stream = SchemaSource::URLs {
316 urls: vec![
317 Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
318 Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
319 ],
320 }
321 .into_stream();
322
323 assert!(
324 matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1)
325 );
326 assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
327 }
328 .with_subscriber(assert_snapshot_subscriber!())
329 .await;
330 }
331
332 #[test(tokio::test)]
333 async fn schema_by_url_fallback() {
334 async {
335 let mock_server = MockServer::start().await;
336 Mock::given(method("GET"))
337 .and(path("/schema1"))
338 .respond_with(ResponseTemplate::new(400))
339 .mount(&mock_server)
340 .await;
341 Mock::given(method("GET"))
342 .and(path("/schema2"))
343 .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_2))
344 .mount(&mock_server)
345 .await;
346
347 let mut stream = SchemaSource::URLs {
348 urls: vec![
349 Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
350 Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
351 ],
352 }
353 .into_stream();
354
355 assert!(
356 matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_2)
357 );
358 assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
359 }
360 .with_subscriber(assert_snapshot_subscriber!({
361 "[].fields[\"url.full\"]" => "[url.full]"
362 }))
363 .await;
364 }
365
366 #[test(tokio::test)]
367 async fn schema_by_url_all_fail() {
368 async {
369 let mock_server = MockServer::start().await;
370 Mock::given(method("GET"))
371 .and(path("/schema1"))
372 .respond_with(ResponseTemplate::new(400))
373 .mount(&mock_server)
374 .await;
375 Mock::given(method("GET"))
376 .and(path("/schema2"))
377 .respond_with(ResponseTemplate::new(400))
378 .mount(&mock_server)
379 .await;
380
381 let mut stream = SchemaSource::URLs {
382 urls: vec![
383 Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
384 Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
385 ],
386 }
387 .into_stream();
388
389 assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
390 }
391 .with_subscriber(assert_snapshot_subscriber!({
392 "[].fields[\"url.full\"]" => "[url.full]"
393 }))
394 .await;
395 }
396}