apollo_router/router/event/
configuration.rs

1use std::path::Path;
2use std::path::PathBuf;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use derivative::Derivative;
7use derive_more::Display;
8use derive_more::From;
9use futures::prelude::*;
10
11use crate::Configuration;
12use crate::router::Event;
13use crate::router::Event::NoMoreConfiguration;
14use crate::router::Event::RhaiReload;
15use crate::router::Event::UpdateConfiguration;
16use crate::uplink::UplinkConfig;
17
18type ConfigurationStream = Pin<Box<dyn Stream<Item = Configuration> + Send>>;
19
20/// The user supplied config. Either a static instance or a stream for hot reloading.
21#[derive(From, Display, Derivative)]
22#[derivative(Debug)]
23#[non_exhaustive]
24pub enum ConfigurationSource {
25    /// A static configuration.
26    ///
27    /// Can be created through `serde::Deserialize` from various formats,
28    /// or inline in Rust code with `serde_json::json!` and `serde_json::from_value`.
29    #[display("Static")]
30    #[from(Configuration, Box<Configuration>)]
31    Static(Box<Configuration>),
32
33    /// A configuration stream where the server will react to new configuration. If possible
34    /// the configuration will be applied without restarting the internal http server.
35    #[display("Stream")]
36    Stream(#[derivative(Debug = "ignore")] ConfigurationStream),
37
38    /// A yaml file that may be watched for changes
39    #[display("File")]
40    File {
41        /// The path of the configuration file.
42        path: PathBuf,
43
44        /// `true` to watch the file for changes and hot apply them.
45        watch: bool,
46    },
47}
48
49impl Default for ConfigurationSource {
50    fn default() -> Self {
51        ConfigurationSource::Static(Default::default())
52    }
53}
54
55impl ConfigurationSource {
56    /// Convert this config into a stream regardless of if is static or not. Allows for unified handling later.
57    pub(crate) fn into_stream(
58        self,
59        uplink_config: Option<UplinkConfig>,
60    ) -> impl Stream<Item = Event> {
61        match self {
62            ConfigurationSource::Static(mut instance) => {
63                instance.uplink = uplink_config;
64                stream::iter(vec![UpdateConfiguration(instance.into())]).boxed()
65            }
66            ConfigurationSource::Stream(stream) => stream
67                .map(move |mut c| {
68                    c.uplink = uplink_config.clone();
69                    UpdateConfiguration(Arc::new(c))
70                })
71                .boxed(),
72            ConfigurationSource::File { path, watch } => {
73                // Sanity check, does the config file exists, if it doesn't then bail.
74                if !path.exists() {
75                    tracing::error!(
76                        "configuration file at path '{}' does not exist.",
77                        path.to_string_lossy()
78                    );
79                    stream::empty().boxed()
80                } else {
81                    match ConfigurationSource::read_config(&path) {
82                        Ok(mut configuration) => {
83                            if watch {
84                                let config_watcher = crate::files::watch(&path)
85                                    .filter_map(move |_| {
86                                        let path = path.clone();
87                                        let uplink_config = uplink_config.clone();
88                                        async move {
89                                            match ConfigurationSource::read_config_async(&path)
90                                                .await
91                                            {
92                                                Ok(mut configuration) => {
93                                                    configuration.uplink = uplink_config.clone();
94                                                    Some(UpdateConfiguration(Arc::new(
95                                                        configuration,
96                                                    )))
97                                                }
98                                                Err(err) => {
99                                                    tracing::error!("{}", err);
100                                                    None
101                                                }
102                                            }
103                                        }
104                                    })
105                                    .boxed();
106                                if let Some(rhai_plugin) =
107                                    configuration.apollo_plugins.plugins.get("rhai")
108                                {
109                                    let scripts_path = match rhai_plugin["scripts"].as_str() {
110                                        Some(path) => Path::new(path),
111                                        None => Path::new("rhai"),
112                                    };
113                                    // If our path is relative, add it to the current dir
114                                    let scripts_watch = if scripts_path.is_relative() {
115                                        let current_directory = std::env::current_dir();
116                                        if current_directory.is_err() {
117                                            tracing::error!("No current directory found",);
118                                            return stream::empty().boxed();
119                                        }
120                                        current_directory.unwrap().join(scripts_path)
121                                    } else {
122                                        scripts_path.into()
123                                    };
124                                    let rhai_watcher = crate::files::watch_rhai(&scripts_watch)
125                                        .filter_map(move |_| future::ready(Some(RhaiReload)))
126                                        .boxed();
127                                    // Select across both our streams
128                                    futures::stream::select(config_watcher, rhai_watcher).boxed()
129                                } else {
130                                    config_watcher
131                                }
132                            } else {
133                                configuration.uplink = uplink_config.clone();
134                                stream::once(future::ready(UpdateConfiguration(Arc::new(
135                                    configuration,
136                                ))))
137                                .boxed()
138                            }
139                        }
140                        Err(err) => {
141                            tracing::error!("Failed to read configuration: {}", err);
142                            stream::empty().boxed()
143                        }
144                    }
145                }
146            }
147        }
148        .chain(stream::iter(vec![NoMoreConfiguration]))
149        .boxed()
150    }
151
152    fn read_config(path: &Path) -> Result<Configuration, ReadConfigError> {
153        let config = std::fs::read_to_string(path)?;
154        config.parse().map_err(ReadConfigError::Validation)
155    }
156    async fn read_config_async(path: &Path) -> Result<Configuration, ReadConfigError> {
157        let config = tokio::fs::read_to_string(path).await?;
158        config.parse().map_err(ReadConfigError::Validation)
159    }
160}
161
162#[derive(From, Display)]
163enum ReadConfigError {
164    /// could not read configuration: {0}
165    Io(std::io::Error),
166    /// {0}
167    Validation(crate::configuration::ConfigurationError),
168}
169
170#[cfg(test)]
171mod tests {
172    use std::env::temp_dir;
173
174    use futures::StreamExt;
175
176    use super::*;
177    use crate::files::tests::create_temp_file;
178    use crate::files::tests::write_and_flush;
179    use crate::uplink::UplinkConfig;
180
181    #[tokio::test(flavor = "multi_thread")]
182    async fn config_by_file_watching() {
183        let (path, mut file) = create_temp_file();
184        let contents = include_str!("../../testdata/supergraph_config.router.yaml");
185        write_and_flush(&mut file, contents).await;
186        let mut stream = ConfigurationSource::File { path, watch: true }
187            .into_stream(Some(UplinkConfig::default()))
188            .boxed();
189
190        // First update is guaranteed
191        assert!(matches!(
192            stream.next().await.unwrap(),
193            UpdateConfiguration(_)
194        ));
195
196        // Need different contents, since we won't get an event if content is the same
197        let contents_datadog = include_str!("../../testdata/datadog.router.yaml");
198        // Modify the file and try again
199        write_and_flush(&mut file, contents_datadog).await;
200        assert!(matches!(
201            stream.next().await.unwrap(),
202            UpdateConfiguration(_)
203        ));
204
205        // This time write garbage, there should not be an update.
206        write_and_flush(&mut file, ":garbage").await;
207        let event = StreamExt::into_future(stream).now_or_never();
208        assert!(event.is_none() || matches!(event, Some((Some(NoMoreConfiguration), _))));
209    }
210
211    #[tokio::test(flavor = "multi_thread")]
212    async fn config_by_file_missing() {
213        let mut stream = ConfigurationSource::File {
214            path: temp_dir().join("does_not_exit"),
215            watch: true,
216        }
217        .into_stream(Some(UplinkConfig::default()));
218
219        // First update fails because the file is invalid.
220        assert!(matches!(stream.next().await.unwrap(), NoMoreConfiguration));
221    }
222
223    #[tokio::test(flavor = "multi_thread")]
224    async fn config_by_file_invalid() {
225        let (path, mut file) = create_temp_file();
226        write_and_flush(&mut file, "Garbage").await;
227        let mut stream = ConfigurationSource::File { path, watch: true }
228            .into_stream(Some(UplinkConfig::default()));
229
230        // First update fails because the file is invalid.
231        assert!(matches!(stream.next().await.unwrap(), NoMoreConfiguration));
232    }
233
234    #[tokio::test(flavor = "multi_thread")]
235    async fn config_by_file_no_watch() {
236        let (path, mut file) = create_temp_file();
237        let contents = include_str!("../../testdata/supergraph_config.router.yaml");
238        write_and_flush(&mut file, contents).await;
239
240        let mut stream = ConfigurationSource::File { path, watch: false }
241            .into_stream(Some(UplinkConfig::default()));
242        assert!(matches!(
243            stream.next().await.unwrap(),
244            UpdateConfiguration(_)
245        ));
246        assert!(matches!(stream.next().await.unwrap(), NoMoreConfiguration));
247    }
248}