Skip to main content

liminal_server/config/
env.rs

1use std::ffi::{OsStr, OsString};
2use std::net::SocketAddr;
3use std::path::PathBuf;
4
5use crate::ServerError;
6
7use super::types::ServerConfig;
8
9const ENV_PREFIX: &str = "LIMINAL_";
10const LISTEN_ADDRESS: &str = "LIMINAL_LISTEN_ADDRESS";
11const HEALTH_LISTEN_ADDRESS: &str = "LIMINAL_HEALTH_LISTEN_ADDRESS";
12const DRAIN_TIMEOUT_MS: &str = "LIMINAL_DRAIN_TIMEOUT_MS";
13const PERSISTENCE_PATH: &str = "LIMINAL_PERSISTENCE_PATH";
14const CLUSTER_NODE_NAME: &str = "LIMINAL_CLUSTER_NODE_NAME";
15const CLUSTER_SEED_NODES: &str = "LIMINAL_CLUSTER_SEED_NODES";
16const CLUSTER_LISTEN_ADDRESS: &str = "LIMINAL_CLUSTER_LISTEN_ADDRESS";
17const CLUSTER_COOKIE: &str = "LIMINAL_CLUSTER_COOKIE";
18
19/// Applies supported `LIMINAL_` environment variable overrides to a config.
20///
21/// Absent variables leave the supplied configuration unchanged. Supported
22/// variables are `LIMINAL_LISTEN_ADDRESS`, `LIMINAL_HEALTH_LISTEN_ADDRESS`,
23/// `LIMINAL_DRAIN_TIMEOUT_MS`, `LIMINAL_PERSISTENCE_PATH`,
24/// `LIMINAL_CLUSTER_NODE_NAME`, `LIMINAL_CLUSTER_SEED_NODES`,
25/// `LIMINAL_CLUSTER_LISTEN_ADDRESS`, and `LIMINAL_CLUSTER_COOKIE`.
26///
27/// # Errors
28///
29/// Returns [`ServerError`] when a present environment variable cannot be parsed
30/// or targets cluster configuration that was not declared in the file.
31pub fn apply_env_overrides(config: ServerConfig) -> Result<ServerConfig, ServerError> {
32    apply_env_overrides_from(config, std::env::vars_os())
33}
34
35pub(crate) fn apply_env_overrides_from<I>(
36    mut config: ServerConfig,
37    variables: I,
38) -> Result<ServerConfig, ServerError>
39where
40    I: IntoIterator<Item = (OsString, OsString)>,
41{
42    for (key, value) in variables {
43        let Some(key) = key.to_str() else {
44            continue;
45        };
46
47        if !key.starts_with(ENV_PREFIX) {
48            continue;
49        }
50
51        match key {
52            LISTEN_ADDRESS => {
53                config.listen_address = parse_socket_addr(LISTEN_ADDRESS, &value)?;
54            }
55            HEALTH_LISTEN_ADDRESS => {
56                config.health_listen_address = parse_socket_addr(HEALTH_LISTEN_ADDRESS, &value)?;
57            }
58            DRAIN_TIMEOUT_MS => {
59                config.drain_timeout_ms = parse_u64(DRAIN_TIMEOUT_MS, &value)?;
60            }
61            PERSISTENCE_PATH => {
62                config.persistence_path = Some(PathBuf::from(value));
63            }
64            CLUSTER_NODE_NAME => {
65                let node_name = env_string(CLUSTER_NODE_NAME, &value)?;
66                cluster_required(&mut config, CLUSTER_NODE_NAME)?.node_name = node_name;
67            }
68            CLUSTER_SEED_NODES => {
69                let seed_nodes = parse_seed_nodes(&value)?;
70                cluster_required(&mut config, CLUSTER_SEED_NODES)?.seed_nodes = seed_nodes;
71            }
72            CLUSTER_LISTEN_ADDRESS => {
73                let listen_address = parse_socket_addr(CLUSTER_LISTEN_ADDRESS, &value)?;
74                cluster_required(&mut config, CLUSTER_LISTEN_ADDRESS)?.listen_address =
75                    listen_address;
76            }
77            CLUSTER_COOKIE => {
78                let cookie = env_string(CLUSTER_COOKIE, &value)?;
79                cluster_required(&mut config, CLUSTER_COOKIE)?.cookie = cookie;
80            }
81            _ => {}
82        }
83    }
84
85    Ok(config)
86}
87
88fn parse_socket_addr(name: &str, value: &OsStr) -> Result<SocketAddr, ServerError> {
89    let value = env_string(name, value)?;
90    value.parse::<SocketAddr>().map_err(|error| {
91        config_load(format!(
92            "environment variable {name} must be a socket address: {error}"
93        ))
94    })
95}
96
97fn parse_u64(name: &str, value: &OsStr) -> Result<u64, ServerError> {
98    let value = env_string(name, value)?;
99    value.parse::<u64>().map_err(|error| {
100        config_load(format!(
101            "environment variable {name} must be an unsigned integer: {error}"
102        ))
103    })
104}
105
106fn parse_seed_nodes(value: &OsStr) -> Result<Vec<SocketAddr>, ServerError> {
107    let value = env_string(CLUSTER_SEED_NODES, value)?;
108    if value.trim().is_empty() {
109        return Ok(Vec::new());
110    }
111
112    value
113        .split(',')
114        .enumerate()
115        .map(|(index, candidate)| parse_seed_node(index, candidate))
116        .collect()
117}
118
119fn parse_seed_node(index: usize, candidate: &str) -> Result<SocketAddr, ServerError> {
120    let candidate = candidate.trim();
121    if candidate.is_empty() {
122        return Err(config_load(format!(
123            "environment variable {CLUSTER_SEED_NODES} contains an empty seed node at position {}",
124            index + 1
125        )));
126    }
127
128    candidate.parse::<SocketAddr>().map_err(|error| {
129        config_load(format!(
130            "environment variable {CLUSTER_SEED_NODES} contains invalid seed node '{}' at position {}: {error}",
131            candidate,
132            index + 1
133        ))
134    })
135}
136
137fn env_string(name: &str, value: &OsStr) -> Result<String, ServerError> {
138    value.to_str().map(str::to_owned).ok_or_else(|| {
139        config_load(format!(
140            "environment variable {name} contains non-Unicode data"
141        ))
142    })
143}
144
145fn cluster_required<'a>(
146    config: &'a mut ServerConfig,
147    name: &str,
148) -> Result<&'a mut super::types::ClusterConfig, ServerError> {
149    config
150        .cluster
151        .as_mut()
152        .ok_or_else(|| ServerError::ConfigValidation {
153            message: format!(
154                "environment variable {name} requires a [cluster] section in the configuration file"
155            ),
156        })
157}
158
159const fn config_load(message: String) -> ServerError {
160    ServerError::ConfigLoad { message }
161}
162
163#[cfg(test)]
164mod tests {
165    use std::ffi::OsString;
166    use std::net::SocketAddr;
167    use std::path::{Path, PathBuf};
168
169    use crate::ServerError;
170
171    use super::apply_env_overrides_from;
172    use crate::config::types::{ChannelDef, ClusterConfig, RoutingRuleDef, ServerConfig};
173    use crate::config::{load_from_file, validate};
174
175    fn socket(address: &str) -> Result<SocketAddr, Box<dyn std::error::Error>> {
176        Ok(address.parse()?)
177    }
178
179    fn sample_config() -> Result<ServerConfig, Box<dyn std::error::Error>> {
180        Ok(ServerConfig {
181            listen_address: socket("127.0.0.1:8080")?,
182            health_listen_address: socket("127.0.0.1:8081")?,
183            drain_timeout_ms: 30_000,
184            channels: vec![ChannelDef {
185                name: "orders".to_owned(),
186                schema_ref: "schemas/orders.json".to_owned(),
187                durable: true,
188            }],
189            routing_rules: vec![RoutingRuleDef {
190                source_channel: "orders".to_owned(),
191                target_channel: "orders".to_owned(),
192                predicate: None,
193            }],
194            persistence_path: Some(PathBuf::from("/tmp")),
195            cluster: Some(ClusterConfig {
196                node_name: "node-a".to_owned(),
197                listen_address: socket("127.0.0.1:9000")?,
198                seed_nodes: vec![socket("127.0.0.1:9001")?],
199                cookie: "test-cookie".to_owned(),
200            }),
201        })
202    }
203
204    fn env_pair(name: &str, value: &str) -> (OsString, OsString) {
205        (OsString::from(name), OsString::from(value))
206    }
207
208    fn write_temp_config(contents: &str) -> Result<PathBuf, Box<dyn std::error::Error>> {
209        let path = std::env::temp_dir().join(format!(
210            "liminal-server-env-pipeline-{}.toml",
211            std::process::id()
212        ));
213        std::fs::write(&path, contents)?;
214        Ok(path)
215    }
216
217    fn remove_temp_file(path: &Path) -> Result<(), Box<dyn std::error::Error>> {
218        if path.exists() {
219            std::fs::remove_file(path)?;
220        }
221        Ok(())
222    }
223
224    #[test]
225    fn listen_address_override_replaces_file_value() -> Result<(), Box<dyn std::error::Error>> {
226        let config = sample_config()?;
227        let config = apply_env_overrides_from(
228            config,
229            vec![env_pair("LIMINAL_LISTEN_ADDRESS", "0.0.0.0:9090")],
230        )?;
231
232        assert_eq!(config.listen_address, socket("0.0.0.0:9090")?);
233
234        Ok(())
235    }
236
237    #[test]
238    fn health_listen_address_override_replaces_file_value() -> Result<(), Box<dyn std::error::Error>>
239    {
240        let config = sample_config()?;
241        let config = apply_env_overrides_from(
242            config,
243            vec![env_pair("LIMINAL_HEALTH_LISTEN_ADDRESS", "0.0.0.0:9191")],
244        )?;
245
246        assert_eq!(config.health_listen_address, socket("0.0.0.0:9191")?);
247
248        Ok(())
249    }
250
251    #[test]
252    fn drain_timeout_override_replaces_file_value() -> Result<(), Box<dyn std::error::Error>> {
253        let config = sample_config()?;
254        let config =
255            apply_env_overrides_from(config, vec![env_pair("LIMINAL_DRAIN_TIMEOUT_MS", "1250")])?;
256
257        assert_eq!(config.drain_timeout_ms, 1250);
258
259        Ok(())
260    }
261
262    #[test]
263    fn persistence_path_override_replaces_file_value() -> Result<(), Box<dyn std::error::Error>> {
264        let config = sample_config()?;
265        let config = apply_env_overrides_from(
266            config,
267            vec![env_pair("LIMINAL_PERSISTENCE_PATH", "/var/lib/liminal")],
268        )?;
269
270        assert_eq!(
271            config.persistence_path.as_deref(),
272            Some(Path::new("/var/lib/liminal"))
273        );
274
275        Ok(())
276    }
277
278    #[test]
279    fn cluster_overrides_replace_existing_cluster_values() -> Result<(), Box<dyn std::error::Error>>
280    {
281        let config = sample_config()?;
282        let config = apply_env_overrides_from(
283            config,
284            vec![
285                env_pair("LIMINAL_CLUSTER_NODE_NAME", "node-b"),
286                env_pair(
287                    "LIMINAL_CLUSTER_SEED_NODES",
288                    "127.0.0.1:9100, 127.0.0.1:9200",
289                ),
290            ],
291        )?;
292
293        let Some(cluster) = config.cluster else {
294            return Err("cluster config should remain present".into());
295        };
296        assert_eq!(cluster.node_name, "node-b");
297        assert_eq!(cluster.seed_nodes.len(), 2);
298        assert_eq!(cluster.seed_nodes[0], socket("127.0.0.1:9100")?);
299        assert_eq!(cluster.seed_nodes[1], socket("127.0.0.1:9200")?);
300
301        Ok(())
302    }
303
304    #[test]
305    fn cluster_listen_address_and_cookie_overrides_replace_values()
306    -> Result<(), Box<dyn std::error::Error>> {
307        let config = sample_config()?;
308        let config = apply_env_overrides_from(
309            config,
310            vec![
311                env_pair("LIMINAL_CLUSTER_LISTEN_ADDRESS", "127.0.0.1:9500"),
312                env_pair("LIMINAL_CLUSTER_COOKIE", "override-cookie"),
313            ],
314        )?;
315
316        let Some(cluster) = config.cluster else {
317            return Err("cluster config should remain present".into());
318        };
319        assert_eq!(cluster.listen_address, socket("127.0.0.1:9500")?);
320        assert_eq!(cluster.cookie, "override-cookie");
321
322        Ok(())
323    }
324
325    #[test]
326    fn cluster_listen_address_override_without_cluster_section_returns_validation_error()
327    -> Result<(), Box<dyn std::error::Error>> {
328        let mut config = sample_config()?;
329        config.cluster = None;
330        let result = apply_env_overrides_from(
331            config,
332            vec![env_pair("LIMINAL_CLUSTER_LISTEN_ADDRESS", "127.0.0.1:9500")],
333        );
334
335        assert!(matches!(result, Err(ServerError::ConfigValidation { .. })));
336
337        Ok(())
338    }
339
340    #[test]
341    fn absent_environment_variables_leave_config_unchanged()
342    -> Result<(), Box<dyn std::error::Error>> {
343        let config = sample_config()?;
344        let original_address = config.listen_address;
345        let original_health_address = config.health_listen_address;
346        let original_drain_timeout_ms = config.drain_timeout_ms;
347        let original_path = config.persistence_path.clone();
348        let original_cluster_name = config
349            .cluster
350            .as_ref()
351            .map(|cluster| cluster.node_name.clone());
352
353        let config = apply_env_overrides_from(config, Vec::new())?;
354
355        assert_eq!(config.listen_address, original_address);
356        assert_eq!(config.health_listen_address, original_health_address);
357        assert_eq!(config.drain_timeout_ms, original_drain_timeout_ms);
358        assert_eq!(config.persistence_path, original_path);
359        assert_eq!(
360            config
361                .cluster
362                .as_ref()
363                .map(|cluster| cluster.node_name.clone()),
364            original_cluster_name
365        );
366
367        Ok(())
368    }
369
370    #[test]
371    fn invalid_listen_address_override_returns_config_load()
372    -> Result<(), Box<dyn std::error::Error>> {
373        let config = sample_config()?;
374        let result = apply_env_overrides_from(
375            config,
376            vec![env_pair("LIMINAL_LISTEN_ADDRESS", "not-a-socket")],
377        );
378
379        assert!(matches!(result, Err(ServerError::ConfigLoad { .. })));
380
381        Ok(())
382    }
383
384    #[test]
385    fn invalid_health_listen_address_override_returns_config_load()
386    -> Result<(), Box<dyn std::error::Error>> {
387        let config = sample_config()?;
388        let result = apply_env_overrides_from(
389            config,
390            vec![env_pair("LIMINAL_HEALTH_LISTEN_ADDRESS", "not-a-socket")],
391        );
392
393        assert!(matches!(result, Err(ServerError::ConfigLoad { .. })));
394
395        Ok(())
396    }
397
398    #[test]
399    fn invalid_drain_timeout_override_returns_config_load() -> Result<(), Box<dyn std::error::Error>>
400    {
401        let config = sample_config()?;
402        let result = apply_env_overrides_from(
403            config,
404            vec![env_pair("LIMINAL_DRAIN_TIMEOUT_MS", "not-a-number")],
405        );
406
407        assert!(matches!(result, Err(ServerError::ConfigLoad { .. })));
408
409        Ok(())
410    }
411
412    #[test]
413    fn cluster_override_without_cluster_section_returns_validation_error()
414    -> Result<(), Box<dyn std::error::Error>> {
415        let mut config = sample_config()?;
416        config.cluster = None;
417        let result = apply_env_overrides_from(
418            config,
419            vec![env_pair("LIMINAL_CLUSTER_NODE_NAME", "node-b")],
420        );
421
422        assert!(matches!(result, Err(ServerError::ConfigValidation { .. })));
423
424        Ok(())
425    }
426
427    #[test]
428    fn file_then_env_then_validate_pipeline_gives_env_precedence()
429    -> Result<(), Box<dyn std::error::Error>> {
430        let toml = r#"
431listen_address = "127.0.0.1:8080"
432health_listen_address = "127.0.0.1:8081"
433drain_timeout_ms = 30000
434persistence_path = "/tmp"
435
436[[channels]]
437name = "orders"
438schema_ref = "schemas/orders.json"
439durable = true
440
441[[routing_rules]]
442source_channel = "orders"
443target_channel = "orders"
444"#;
445        let path = write_temp_config(toml)?;
446        let config = load_from_file(&path)?;
447        let config = apply_env_overrides_from(
448            config,
449            vec![env_pair("LIMINAL_LISTEN_ADDRESS", "0.0.0.0:9090")],
450        )?;
451        validate(&config)?;
452        remove_temp_file(&path)?;
453
454        assert_eq!(config.listen_address, socket("0.0.0.0:9090")?);
455
456        Ok(())
457    }
458}