apollo_router/router/event/
configuration.rs1use std::path::Path;
2use std::path::PathBuf;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use derivative::Derivative;
7use derive_more::Display;
8use derive_more::From;
9use futures::prelude::*;
10
11use crate::Configuration;
12use crate::router::Event;
13use crate::router::Event::NoMoreConfiguration;
14use crate::router::Event::RhaiReload;
15use crate::router::Event::UpdateConfiguration;
16use crate::uplink::UplinkConfig;
17
18type ConfigurationStream = Pin<Box<dyn Stream<Item = Configuration> + Send>>;
19
20#[derive(From, Display, Derivative)]
22#[derivative(Debug)]
23#[non_exhaustive]
24pub enum ConfigurationSource {
25 #[display("Static")]
30 #[from(Configuration, Box<Configuration>)]
31 Static(Box<Configuration>),
32
33 #[display("Stream")]
36 Stream(#[derivative(Debug = "ignore")] ConfigurationStream),
37
38 #[display("File")]
40 File {
41 path: PathBuf,
43
44 watch: bool,
46 },
47}
48
49impl Default for ConfigurationSource {
50 fn default() -> Self {
51 ConfigurationSource::Static(Default::default())
52 }
53}
54
55impl ConfigurationSource {
56 pub(crate) fn into_stream(
58 self,
59 uplink_config: Option<UplinkConfig>,
60 ) -> impl Stream<Item = Event> {
61 match self {
62 ConfigurationSource::Static(mut instance) => {
63 instance.uplink = uplink_config;
64 stream::iter(vec![UpdateConfiguration(instance.into())]).boxed()
65 }
66 ConfigurationSource::Stream(stream) => stream
67 .map(move |mut c| {
68 c.uplink = uplink_config.clone();
69 UpdateConfiguration(Arc::new(c))
70 })
71 .boxed(),
72 ConfigurationSource::File { path, watch } => {
73 if !path.exists() {
75 tracing::error!(
76 "configuration file at path '{}' does not exist.",
77 path.to_string_lossy()
78 );
79 stream::empty().boxed()
80 } else {
81 match ConfigurationSource::read_config(&path) {
82 Ok(mut configuration) => {
83 if watch {
84 let config_watcher = crate::files::watch(&path)
85 .filter_map(move |_| {
86 let path = path.clone();
87 let uplink_config = uplink_config.clone();
88 async move {
89 match ConfigurationSource::read_config_async(&path)
90 .await
91 {
92 Ok(mut configuration) => {
93 configuration.uplink = uplink_config.clone();
94 Some(UpdateConfiguration(Arc::new(
95 configuration,
96 )))
97 }
98 Err(err) => {
99 tracing::error!("{}", err);
100 None
101 }
102 }
103 }
104 })
105 .boxed();
106 if let Some(rhai_plugin) =
107 configuration.apollo_plugins.plugins.get("rhai")
108 {
109 let scripts_path = match rhai_plugin["scripts"].as_str() {
110 Some(path) => Path::new(path),
111 None => Path::new("rhai"),
112 };
113 let scripts_watch = if scripts_path.is_relative() {
115 let current_directory = std::env::current_dir();
116 if current_directory.is_err() {
117 tracing::error!("No current directory found",);
118 return stream::empty().boxed();
119 }
120 current_directory.unwrap().join(scripts_path)
121 } else {
122 scripts_path.into()
123 };
124 let rhai_watcher = crate::files::watch_rhai(&scripts_watch)
125 .filter_map(move |_| future::ready(Some(RhaiReload)))
126 .boxed();
127 futures::stream::select(config_watcher, rhai_watcher).boxed()
129 } else {
130 config_watcher
131 }
132 } else {
133 configuration.uplink = uplink_config.clone();
134 stream::once(future::ready(UpdateConfiguration(Arc::new(
135 configuration,
136 ))))
137 .boxed()
138 }
139 }
140 Err(err) => {
141 tracing::error!("Failed to read configuration: {}", err);
142 stream::empty().boxed()
143 }
144 }
145 }
146 }
147 }
148 .chain(stream::iter(vec![NoMoreConfiguration]))
149 .boxed()
150 }
151
152 fn read_config(path: &Path) -> Result<Configuration, ReadConfigError> {
153 let config = std::fs::read_to_string(path)?;
154 config.parse().map_err(ReadConfigError::Validation)
155 }
156 async fn read_config_async(path: &Path) -> Result<Configuration, ReadConfigError> {
157 let config = tokio::fs::read_to_string(path).await?;
158 config.parse().map_err(ReadConfigError::Validation)
159 }
160}
161
162#[derive(From, Display)]
163enum ReadConfigError {
164 Io(std::io::Error),
166 Validation(crate::configuration::ConfigurationError),
168}
169
170#[cfg(test)]
171mod tests {
172 use std::env::temp_dir;
173
174 use futures::StreamExt;
175
176 use super::*;
177 use crate::files::tests::create_temp_file;
178 use crate::files::tests::write_and_flush;
179 use crate::uplink::UplinkConfig;
180
181 #[tokio::test(flavor = "multi_thread")]
182 async fn config_by_file_watching() {
183 let (path, mut file) = create_temp_file();
184 let contents = include_str!("../../testdata/supergraph_config.router.yaml");
185 write_and_flush(&mut file, contents).await;
186 let mut stream = ConfigurationSource::File { path, watch: true }
187 .into_stream(Some(UplinkConfig::default()))
188 .boxed();
189
190 assert!(matches!(
192 stream.next().await.unwrap(),
193 UpdateConfiguration(_)
194 ));
195
196 let contents_datadog = include_str!("../../testdata/datadog.router.yaml");
198 write_and_flush(&mut file, contents_datadog).await;
200 assert!(matches!(
201 stream.next().await.unwrap(),
202 UpdateConfiguration(_)
203 ));
204
205 write_and_flush(&mut file, ":garbage").await;
207 let event = StreamExt::into_future(stream).now_or_never();
208 assert!(event.is_none() || matches!(event, Some((Some(NoMoreConfiguration), _))));
209 }
210
211 #[tokio::test(flavor = "multi_thread")]
212 async fn config_by_file_missing() {
213 let mut stream = ConfigurationSource::File {
214 path: temp_dir().join("does_not_exit"),
215 watch: true,
216 }
217 .into_stream(Some(UplinkConfig::default()));
218
219 assert!(matches!(stream.next().await.unwrap(), NoMoreConfiguration));
221 }
222
223 #[tokio::test(flavor = "multi_thread")]
224 async fn config_by_file_invalid() {
225 let (path, mut file) = create_temp_file();
226 write_and_flush(&mut file, "Garbage").await;
227 let mut stream = ConfigurationSource::File { path, watch: true }
228 .into_stream(Some(UplinkConfig::default()));
229
230 assert!(matches!(stream.next().await.unwrap(), NoMoreConfiguration));
232 }
233
234 #[tokio::test(flavor = "multi_thread")]
235 async fn config_by_file_no_watch() {
236 let (path, mut file) = create_temp_file();
237 let contents = include_str!("../../testdata/supergraph_config.router.yaml");
238 write_and_flush(&mut file, contents).await;
239
240 let mut stream = ConfigurationSource::File { path, watch: false }
241 .into_stream(Some(UplinkConfig::default()));
242 assert!(matches!(
243 stream.next().await.unwrap(),
244 UpdateConfiguration(_)
245 ));
246 assert!(matches!(stream.next().await.unwrap(), NoMoreConfiguration));
247 }
248}