Skip to main content

apollo_router/router/event/
configuration.rs

1use std::path::Path;
2use std::path::PathBuf;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6
7use derivative::Derivative;
8use derive_more::Display;
9use derive_more::From;
10use futures::prelude::*;
11
12use crate::Configuration;
13use crate::router::Event;
14use crate::router::Event::NoMoreConfiguration;
15use crate::router::Event::UpdateConfiguration;
16use crate::uplink::UplinkConfig;
17
18type ConfigurationStream = Pin<Box<dyn Stream<Item = Configuration> + Send>>;
19
20/// The user supplied config. Either a static instance or a stream for hot reloading.
21#[derive(From, Display, Derivative)]
22#[derivative(Debug)]
23#[non_exhaustive]
24pub enum ConfigurationSource {
25    /// A static configuration.
26    ///
27    /// Can be created through `serde::Deserialize` from various formats,
28    /// or inline in Rust code with `serde_json::json!` and `serde_json::from_value`.
29    #[display(fmt = "Static")]
30    #[from(types(Configuration))]
31    Static(Box<Configuration>),
32
33    /// A configuration stream where the server will react to new configuration. If possible
34    /// the configuration will be applied without restarting the internal http server.
35    #[display(fmt = "Stream")]
36    Stream(#[derivative(Debug = "ignore")] ConfigurationStream),
37
38    /// A yaml file that may be watched for changes
39    #[display(fmt = "File")]
40    File {
41        /// The path of the configuration file.
42        path: PathBuf,
43
44        /// `true` to watch the file for changes and hot apply them.
45        watch: bool,
46
47        /// When watching, the delay to wait before applying the new configuration.
48        /// Note: This variable is deprecated and has no effect.
49        #[deprecated]
50        delay: Option<Duration>,
51    },
52}
53
54impl Default for ConfigurationSource {
55    fn default() -> Self {
56        ConfigurationSource::Static(Default::default())
57    }
58}
59
60impl ConfigurationSource {
61    /// Convert this config into a stream regardless of if is static or not. Allows for unified handling later.
62    pub(crate) fn into_stream(
63        self,
64        uplink_config: Option<UplinkConfig>,
65    ) -> impl Stream<Item = Event> {
66        match self {
67            ConfigurationSource::Static(mut instance) => {
68                instance.uplink = uplink_config;
69                stream::iter(vec![UpdateConfiguration(Arc::new(*instance))]).boxed()
70            }
71            ConfigurationSource::Stream(stream) => stream
72                .map(move |mut c| {
73                    c.uplink = uplink_config.clone();
74                    UpdateConfiguration(Arc::new(c))
75                })
76                .boxed(),
77            #[allow(deprecated)]
78            ConfigurationSource::File {
79                path,
80                watch,
81                delay: _,
82            } => {
83                // Sanity check, does the config file exists, if it doesn't then bail.
84                if !path.exists() {
85                    tracing::error!(
86                        "configuration file at path '{}' does not exist.",
87                        path.to_string_lossy()
88                    );
89                    stream::empty().boxed()
90                } else {
91                    match ConfigurationSource::read_config(&path) {
92                        Ok(mut configuration) => {
93                            if watch {
94                                crate::files::watch(&path)
95                                    .filter_map(move |_| {
96                                        let path = path.clone();
97                                        let uplink_config = uplink_config.clone();
98                                        async move {
99                                            match ConfigurationSource::read_config_async(&path)
100                                                .await
101                                            {
102                                                Ok(mut configuration) => {
103                                                    configuration.uplink = uplink_config.clone();
104                                                    Some(UpdateConfiguration(Arc::new(
105                                                        configuration,
106                                                    )))
107                                                }
108                                                Err(err) => {
109                                                    tracing::error!("{}", err);
110                                                    None
111                                                }
112                                            }
113                                        }
114                                    })
115                                    .boxed()
116                            } else {
117                                configuration.uplink = uplink_config.clone();
118                                stream::once(future::ready(UpdateConfiguration(Arc::new(
119                                    configuration,
120                                ))))
121                                .boxed()
122                            }
123                        }
124                        Err(err) => {
125                            tracing::error!("Failed to read configuration: {}", err);
126                            stream::empty().boxed()
127                        }
128                    }
129                }
130            }
131        }
132        .chain(stream::iter(vec![NoMoreConfiguration]))
133        .boxed()
134    }
135
136    fn read_config(path: &Path) -> Result<Configuration, ReadConfigError> {
137        let config = std::fs::read_to_string(path)?;
138        config.parse().map_err(ReadConfigError::Validation)
139    }
140    async fn read_config_async(path: &Path) -> Result<Configuration, ReadConfigError> {
141        let config = tokio::fs::read_to_string(path).await?;
142        config.parse().map_err(ReadConfigError::Validation)
143    }
144}
145
146#[derive(From, Display)]
147enum ReadConfigError {
148    /// could not read configuration: {0}
149    Io(std::io::Error),
150    /// {0}
151    Validation(crate::configuration::ConfigurationError),
152}
153
154#[cfg(test)]
155mod tests {
156    use std::env::temp_dir;
157
158    use super::*;
159    use crate::files::tests::create_temp_file;
160    use crate::files::tests::write_and_flush;
161    use crate::uplink::UplinkConfig;
162
163    #[tokio::test(flavor = "multi_thread")]
164    async fn config_by_file_watching() {
165        let (path, mut file) = create_temp_file();
166        let contents = include_str!("../../testdata/supergraph_config.router.yaml");
167        write_and_flush(&mut file, contents).await;
168        let mut stream = ConfigurationSource::File {
169            path,
170            watch: true,
171            delay: None,
172        }
173        .into_stream(Some(UplinkConfig::default()))
174        .boxed();
175
176        // First update is guaranteed
177        assert!(matches!(
178            stream.next().await.unwrap(),
179            UpdateConfiguration(_)
180        ));
181
182        // Need different contents, since we won't get an event if content is the same
183        let contents_datadog = include_str!("../../testdata/datadog.router.yaml");
184        // Modify the file and try again
185        write_and_flush(&mut file, contents_datadog).await;
186        assert!(matches!(
187            stream.next().await.unwrap(),
188            UpdateConfiguration(_)
189        ));
190
191        // This time write garbage, there should not be an update.
192        write_and_flush(&mut file, ":garbage").await;
193        let event = stream.into_future().now_or_never();
194        assert!(event.is_none() || matches!(event, Some((Some(NoMoreConfiguration), _))));
195    }
196
197    #[tokio::test(flavor = "multi_thread")]
198    async fn config_by_file_missing() {
199        let mut stream = ConfigurationSource::File {
200            path: temp_dir().join("does_not_exit"),
201            watch: true,
202            delay: None,
203        }
204        .into_stream(Some(UplinkConfig::default()));
205
206        // First update fails because the file is invalid.
207        assert!(matches!(stream.next().await.unwrap(), NoMoreConfiguration));
208    }
209
210    #[tokio::test(flavor = "multi_thread")]
211    async fn config_by_file_invalid() {
212        let (path, mut file) = create_temp_file();
213        write_and_flush(&mut file, "Garbage").await;
214        let mut stream = ConfigurationSource::File {
215            path,
216            watch: true,
217            delay: None,
218        }
219        .into_stream(Some(UplinkConfig::default()));
220
221        // First update fails because the file is invalid.
222        assert!(matches!(stream.next().await.unwrap(), NoMoreConfiguration));
223    }
224
225    #[tokio::test(flavor = "multi_thread")]
226    async fn config_by_file_no_watch() {
227        let (path, mut file) = create_temp_file();
228        let contents = include_str!("../../testdata/supergraph_config.router.yaml");
229        write_and_flush(&mut file, contents).await;
230
231        let mut stream = ConfigurationSource::File {
232            path,
233            watch: false,
234            delay: None,
235        }
236        .into_stream(Some(UplinkConfig::default()));
237        assert!(matches!(
238            stream.next().await.unwrap(),
239            UpdateConfiguration(_)
240        ));
241        assert!(matches!(stream.next().await.unwrap(), NoMoreConfiguration));
242    }
243}