apollo_router/router/event/
configuration.rs1use std::path::Path;
2use std::path::PathBuf;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6
7use derivative::Derivative;
8use derive_more::Display;
9use derive_more::From;
10use futures::prelude::*;
11
12use crate::Configuration;
13use crate::router::Event;
14use crate::router::Event::NoMoreConfiguration;
15use crate::router::Event::UpdateConfiguration;
16use crate::uplink::UplinkConfig;
17
18type ConfigurationStream = Pin<Box<dyn Stream<Item = Configuration> + Send>>;
19
20#[derive(From, Display, Derivative)]
22#[derivative(Debug)]
23#[non_exhaustive]
24pub enum ConfigurationSource {
25 #[display(fmt = "Static")]
30 #[from(types(Configuration))]
31 Static(Box<Configuration>),
32
33 #[display(fmt = "Stream")]
36 Stream(#[derivative(Debug = "ignore")] ConfigurationStream),
37
38 #[display(fmt = "File")]
40 File {
41 path: PathBuf,
43
44 watch: bool,
46
47 #[deprecated]
50 delay: Option<Duration>,
51 },
52}
53
54impl Default for ConfigurationSource {
55 fn default() -> Self {
56 ConfigurationSource::Static(Default::default())
57 }
58}
59
60impl ConfigurationSource {
61 pub(crate) fn into_stream(
63 self,
64 uplink_config: Option<UplinkConfig>,
65 ) -> impl Stream<Item = Event> {
66 match self {
67 ConfigurationSource::Static(mut instance) => {
68 instance.uplink = uplink_config;
69 stream::iter(vec![UpdateConfiguration(Arc::new(*instance))]).boxed()
70 }
71 ConfigurationSource::Stream(stream) => stream
72 .map(move |mut c| {
73 c.uplink = uplink_config.clone();
74 UpdateConfiguration(Arc::new(c))
75 })
76 .boxed(),
77 #[allow(deprecated)]
78 ConfigurationSource::File {
79 path,
80 watch,
81 delay: _,
82 } => {
83 if !path.exists() {
85 tracing::error!(
86 "configuration file at path '{}' does not exist.",
87 path.to_string_lossy()
88 );
89 stream::empty().boxed()
90 } else {
91 match ConfigurationSource::read_config(&path) {
92 Ok(mut configuration) => {
93 if watch {
94 crate::files::watch(&path)
95 .filter_map(move |_| {
96 let path = path.clone();
97 let uplink_config = uplink_config.clone();
98 async move {
99 match ConfigurationSource::read_config_async(&path)
100 .await
101 {
102 Ok(mut configuration) => {
103 configuration.uplink = uplink_config.clone();
104 Some(UpdateConfiguration(Arc::new(
105 configuration,
106 )))
107 }
108 Err(err) => {
109 tracing::error!("{}", err);
110 None
111 }
112 }
113 }
114 })
115 .boxed()
116 } else {
117 configuration.uplink = uplink_config.clone();
118 stream::once(future::ready(UpdateConfiguration(Arc::new(
119 configuration,
120 ))))
121 .boxed()
122 }
123 }
124 Err(err) => {
125 tracing::error!("Failed to read configuration: {}", err);
126 stream::empty().boxed()
127 }
128 }
129 }
130 }
131 }
132 .chain(stream::iter(vec![NoMoreConfiguration]))
133 .boxed()
134 }
135
136 fn read_config(path: &Path) -> Result<Configuration, ReadConfigError> {
137 let config = std::fs::read_to_string(path)?;
138 config.parse().map_err(ReadConfigError::Validation)
139 }
140 async fn read_config_async(path: &Path) -> Result<Configuration, ReadConfigError> {
141 let config = tokio::fs::read_to_string(path).await?;
142 config.parse().map_err(ReadConfigError::Validation)
143 }
144}
145
146#[derive(From, Display)]
147enum ReadConfigError {
148 Io(std::io::Error),
150 Validation(crate::configuration::ConfigurationError),
152}
153
154#[cfg(test)]
155mod tests {
156 use std::env::temp_dir;
157
158 use super::*;
159 use crate::files::tests::create_temp_file;
160 use crate::files::tests::write_and_flush;
161 use crate::uplink::UplinkConfig;
162
163 #[tokio::test(flavor = "multi_thread")]
164 async fn config_by_file_watching() {
165 let (path, mut file) = create_temp_file();
166 let contents = include_str!("../../testdata/supergraph_config.router.yaml");
167 write_and_flush(&mut file, contents).await;
168 let mut stream = ConfigurationSource::File {
169 path,
170 watch: true,
171 delay: None,
172 }
173 .into_stream(Some(UplinkConfig::default()))
174 .boxed();
175
176 assert!(matches!(
178 stream.next().await.unwrap(),
179 UpdateConfiguration(_)
180 ));
181
182 let contents_datadog = include_str!("../../testdata/datadog.router.yaml");
184 write_and_flush(&mut file, contents_datadog).await;
186 assert!(matches!(
187 stream.next().await.unwrap(),
188 UpdateConfiguration(_)
189 ));
190
191 write_and_flush(&mut file, ":garbage").await;
193 let event = stream.into_future().now_or_never();
194 assert!(event.is_none() || matches!(event, Some((Some(NoMoreConfiguration), _))));
195 }
196
197 #[tokio::test(flavor = "multi_thread")]
198 async fn config_by_file_missing() {
199 let mut stream = ConfigurationSource::File {
200 path: temp_dir().join("does_not_exit"),
201 watch: true,
202 delay: None,
203 }
204 .into_stream(Some(UplinkConfig::default()));
205
206 assert!(matches!(stream.next().await.unwrap(), NoMoreConfiguration));
208 }
209
210 #[tokio::test(flavor = "multi_thread")]
211 async fn config_by_file_invalid() {
212 let (path, mut file) = create_temp_file();
213 write_and_flush(&mut file, "Garbage").await;
214 let mut stream = ConfigurationSource::File {
215 path,
216 watch: true,
217 delay: None,
218 }
219 .into_stream(Some(UplinkConfig::default()));
220
221 assert!(matches!(stream.next().await.unwrap(), NoMoreConfiguration));
223 }
224
225 #[tokio::test(flavor = "multi_thread")]
226 async fn config_by_file_no_watch() {
227 let (path, mut file) = create_temp_file();
228 let contents = include_str!("../../testdata/supergraph_config.router.yaml");
229 write_and_flush(&mut file, contents).await;
230
231 let mut stream = ConfigurationSource::File {
232 path,
233 watch: false,
234 delay: None,
235 }
236 .into_stream(Some(UplinkConfig::default()));
237 assert!(matches!(
238 stream.next().await.unwrap(),
239 UpdateConfiguration(_)
240 ));
241 assert!(matches!(stream.next().await.unwrap(), NoMoreConfiguration));
242 }
243}