1use std::ffi::{OsStr, OsString};
2use std::net::SocketAddr;
3use std::path::PathBuf;
4
5use crate::ServerError;
6
7use super::types::ServerConfig;
8
9const ENV_PREFIX: &str = "LIMINAL_";
10const LISTEN_ADDRESS: &str = "LIMINAL_LISTEN_ADDRESS";
11const HEALTH_LISTEN_ADDRESS: &str = "LIMINAL_HEALTH_LISTEN_ADDRESS";
12const DRAIN_TIMEOUT_MS: &str = "LIMINAL_DRAIN_TIMEOUT_MS";
13const PERSISTENCE_PATH: &str = "LIMINAL_PERSISTENCE_PATH";
14const CLUSTER_NODE_NAME: &str = "LIMINAL_CLUSTER_NODE_NAME";
15const CLUSTER_SEED_NODES: &str = "LIMINAL_CLUSTER_SEED_NODES";
16const CLUSTER_LISTEN_ADDRESS: &str = "LIMINAL_CLUSTER_LISTEN_ADDRESS";
17const CLUSTER_COOKIE: &str = "LIMINAL_CLUSTER_COOKIE";
18
19pub fn apply_env_overrides(config: ServerConfig) -> Result<ServerConfig, ServerError> {
32 apply_env_overrides_from(config, std::env::vars_os())
33}
34
35pub(crate) fn apply_env_overrides_from<I>(
36 mut config: ServerConfig,
37 variables: I,
38) -> Result<ServerConfig, ServerError>
39where
40 I: IntoIterator<Item = (OsString, OsString)>,
41{
42 for (key, value) in variables {
43 let Some(key) = key.to_str() else {
44 continue;
45 };
46
47 if !key.starts_with(ENV_PREFIX) {
48 continue;
49 }
50
51 match key {
52 LISTEN_ADDRESS => {
53 config.listen_address = parse_socket_addr(LISTEN_ADDRESS, &value)?;
54 }
55 HEALTH_LISTEN_ADDRESS => {
56 config.health_listen_address = parse_socket_addr(HEALTH_LISTEN_ADDRESS, &value)?;
57 }
58 DRAIN_TIMEOUT_MS => {
59 config.drain_timeout_ms = parse_u64(DRAIN_TIMEOUT_MS, &value)?;
60 }
61 PERSISTENCE_PATH => {
62 config.persistence_path = Some(PathBuf::from(value));
63 }
64 CLUSTER_NODE_NAME => {
65 let node_name = env_string(CLUSTER_NODE_NAME, &value)?;
66 cluster_required(&mut config, CLUSTER_NODE_NAME)?.node_name = node_name;
67 }
68 CLUSTER_SEED_NODES => {
69 let seed_nodes = parse_seed_nodes(&value)?;
70 cluster_required(&mut config, CLUSTER_SEED_NODES)?.seed_nodes = seed_nodes;
71 }
72 CLUSTER_LISTEN_ADDRESS => {
73 let listen_address = parse_socket_addr(CLUSTER_LISTEN_ADDRESS, &value)?;
74 cluster_required(&mut config, CLUSTER_LISTEN_ADDRESS)?.listen_address =
75 listen_address;
76 }
77 CLUSTER_COOKIE => {
78 let cookie = env_string(CLUSTER_COOKIE, &value)?;
79 cluster_required(&mut config, CLUSTER_COOKIE)?.cookie = cookie;
80 }
81 _ => {}
82 }
83 }
84
85 Ok(config)
86}
87
88fn parse_socket_addr(name: &str, value: &OsStr) -> Result<SocketAddr, ServerError> {
89 let value = env_string(name, value)?;
90 value.parse::<SocketAddr>().map_err(|error| {
91 config_load(format!(
92 "environment variable {name} must be a socket address: {error}"
93 ))
94 })
95}
96
97fn parse_u64(name: &str, value: &OsStr) -> Result<u64, ServerError> {
98 let value = env_string(name, value)?;
99 value.parse::<u64>().map_err(|error| {
100 config_load(format!(
101 "environment variable {name} must be an unsigned integer: {error}"
102 ))
103 })
104}
105
106fn parse_seed_nodes(value: &OsStr) -> Result<Vec<SocketAddr>, ServerError> {
107 let value = env_string(CLUSTER_SEED_NODES, value)?;
108 if value.trim().is_empty() {
109 return Ok(Vec::new());
110 }
111
112 value
113 .split(',')
114 .enumerate()
115 .map(|(index, candidate)| parse_seed_node(index, candidate))
116 .collect()
117}
118
119fn parse_seed_node(index: usize, candidate: &str) -> Result<SocketAddr, ServerError> {
120 let candidate = candidate.trim();
121 if candidate.is_empty() {
122 return Err(config_load(format!(
123 "environment variable {CLUSTER_SEED_NODES} contains an empty seed node at position {}",
124 index + 1
125 )));
126 }
127
128 candidate.parse::<SocketAddr>().map_err(|error| {
129 config_load(format!(
130 "environment variable {CLUSTER_SEED_NODES} contains invalid seed node '{}' at position {}: {error}",
131 candidate,
132 index + 1
133 ))
134 })
135}
136
137fn env_string(name: &str, value: &OsStr) -> Result<String, ServerError> {
138 value.to_str().map(str::to_owned).ok_or_else(|| {
139 config_load(format!(
140 "environment variable {name} contains non-Unicode data"
141 ))
142 })
143}
144
145fn cluster_required<'a>(
146 config: &'a mut ServerConfig,
147 name: &str,
148) -> Result<&'a mut super::types::ClusterConfig, ServerError> {
149 config
150 .cluster
151 .as_mut()
152 .ok_or_else(|| ServerError::ConfigValidation {
153 message: format!(
154 "environment variable {name} requires a [cluster] section in the configuration file"
155 ),
156 })
157}
158
159const fn config_load(message: String) -> ServerError {
160 ServerError::ConfigLoad { message }
161}
162
163#[cfg(test)]
164mod tests {
165 use std::ffi::OsString;
166 use std::net::SocketAddr;
167 use std::path::{Path, PathBuf};
168
169 use crate::ServerError;
170
171 use super::apply_env_overrides_from;
172 use crate::config::types::{ChannelDef, ClusterConfig, RoutingRuleDef, ServerConfig};
173 use crate::config::{load_from_file, validate};
174
175 fn socket(address: &str) -> Result<SocketAddr, Box<dyn std::error::Error>> {
176 Ok(address.parse()?)
177 }
178
179 fn sample_config() -> Result<ServerConfig, Box<dyn std::error::Error>> {
180 Ok(ServerConfig {
181 listen_address: socket("127.0.0.1:8080")?,
182 health_listen_address: socket("127.0.0.1:8081")?,
183 drain_timeout_ms: 30_000,
184 channels: vec![ChannelDef {
185 name: "orders".to_owned(),
186 schema_ref: "schemas/orders.json".to_owned(),
187 durable: true,
188 }],
189 routing_rules: vec![RoutingRuleDef {
190 source_channel: "orders".to_owned(),
191 target_channel: "orders".to_owned(),
192 predicate: None,
193 }],
194 persistence_path: Some(PathBuf::from("/tmp")),
195 cluster: Some(ClusterConfig {
196 node_name: "node-a".to_owned(),
197 listen_address: socket("127.0.0.1:9000")?,
198 seed_nodes: vec![socket("127.0.0.1:9001")?],
199 cookie: "test-cookie".to_owned(),
200 }),
201 })
202 }
203
204 fn env_pair(name: &str, value: &str) -> (OsString, OsString) {
205 (OsString::from(name), OsString::from(value))
206 }
207
208 fn write_temp_config(contents: &str) -> Result<PathBuf, Box<dyn std::error::Error>> {
209 let path = std::env::temp_dir().join(format!(
210 "liminal-server-env-pipeline-{}.toml",
211 std::process::id()
212 ));
213 std::fs::write(&path, contents)?;
214 Ok(path)
215 }
216
217 fn remove_temp_file(path: &Path) -> Result<(), Box<dyn std::error::Error>> {
218 if path.exists() {
219 std::fs::remove_file(path)?;
220 }
221 Ok(())
222 }
223
224 #[test]
225 fn listen_address_override_replaces_file_value() -> Result<(), Box<dyn std::error::Error>> {
226 let config = sample_config()?;
227 let config = apply_env_overrides_from(
228 config,
229 vec![env_pair("LIMINAL_LISTEN_ADDRESS", "0.0.0.0:9090")],
230 )?;
231
232 assert_eq!(config.listen_address, socket("0.0.0.0:9090")?);
233
234 Ok(())
235 }
236
237 #[test]
238 fn health_listen_address_override_replaces_file_value() -> Result<(), Box<dyn std::error::Error>>
239 {
240 let config = sample_config()?;
241 let config = apply_env_overrides_from(
242 config,
243 vec![env_pair("LIMINAL_HEALTH_LISTEN_ADDRESS", "0.0.0.0:9191")],
244 )?;
245
246 assert_eq!(config.health_listen_address, socket("0.0.0.0:9191")?);
247
248 Ok(())
249 }
250
251 #[test]
252 fn drain_timeout_override_replaces_file_value() -> Result<(), Box<dyn std::error::Error>> {
253 let config = sample_config()?;
254 let config =
255 apply_env_overrides_from(config, vec![env_pair("LIMINAL_DRAIN_TIMEOUT_MS", "1250")])?;
256
257 assert_eq!(config.drain_timeout_ms, 1250);
258
259 Ok(())
260 }
261
262 #[test]
263 fn persistence_path_override_replaces_file_value() -> Result<(), Box<dyn std::error::Error>> {
264 let config = sample_config()?;
265 let config = apply_env_overrides_from(
266 config,
267 vec![env_pair("LIMINAL_PERSISTENCE_PATH", "/var/lib/liminal")],
268 )?;
269
270 assert_eq!(
271 config.persistence_path.as_deref(),
272 Some(Path::new("/var/lib/liminal"))
273 );
274
275 Ok(())
276 }
277
278 #[test]
279 fn cluster_overrides_replace_existing_cluster_values() -> Result<(), Box<dyn std::error::Error>>
280 {
281 let config = sample_config()?;
282 let config = apply_env_overrides_from(
283 config,
284 vec![
285 env_pair("LIMINAL_CLUSTER_NODE_NAME", "node-b"),
286 env_pair(
287 "LIMINAL_CLUSTER_SEED_NODES",
288 "127.0.0.1:9100, 127.0.0.1:9200",
289 ),
290 ],
291 )?;
292
293 let Some(cluster) = config.cluster else {
294 return Err("cluster config should remain present".into());
295 };
296 assert_eq!(cluster.node_name, "node-b");
297 assert_eq!(cluster.seed_nodes.len(), 2);
298 assert_eq!(cluster.seed_nodes[0], socket("127.0.0.1:9100")?);
299 assert_eq!(cluster.seed_nodes[1], socket("127.0.0.1:9200")?);
300
301 Ok(())
302 }
303
304 #[test]
305 fn cluster_listen_address_and_cookie_overrides_replace_values()
306 -> Result<(), Box<dyn std::error::Error>> {
307 let config = sample_config()?;
308 let config = apply_env_overrides_from(
309 config,
310 vec![
311 env_pair("LIMINAL_CLUSTER_LISTEN_ADDRESS", "127.0.0.1:9500"),
312 env_pair("LIMINAL_CLUSTER_COOKIE", "override-cookie"),
313 ],
314 )?;
315
316 let Some(cluster) = config.cluster else {
317 return Err("cluster config should remain present".into());
318 };
319 assert_eq!(cluster.listen_address, socket("127.0.0.1:9500")?);
320 assert_eq!(cluster.cookie, "override-cookie");
321
322 Ok(())
323 }
324
325 #[test]
326 fn cluster_listen_address_override_without_cluster_section_returns_validation_error()
327 -> Result<(), Box<dyn std::error::Error>> {
328 let mut config = sample_config()?;
329 config.cluster = None;
330 let result = apply_env_overrides_from(
331 config,
332 vec![env_pair("LIMINAL_CLUSTER_LISTEN_ADDRESS", "127.0.0.1:9500")],
333 );
334
335 assert!(matches!(result, Err(ServerError::ConfigValidation { .. })));
336
337 Ok(())
338 }
339
340 #[test]
341 fn absent_environment_variables_leave_config_unchanged()
342 -> Result<(), Box<dyn std::error::Error>> {
343 let config = sample_config()?;
344 let original_address = config.listen_address;
345 let original_health_address = config.health_listen_address;
346 let original_drain_timeout_ms = config.drain_timeout_ms;
347 let original_path = config.persistence_path.clone();
348 let original_cluster_name = config
349 .cluster
350 .as_ref()
351 .map(|cluster| cluster.node_name.clone());
352
353 let config = apply_env_overrides_from(config, Vec::new())?;
354
355 assert_eq!(config.listen_address, original_address);
356 assert_eq!(config.health_listen_address, original_health_address);
357 assert_eq!(config.drain_timeout_ms, original_drain_timeout_ms);
358 assert_eq!(config.persistence_path, original_path);
359 assert_eq!(
360 config
361 .cluster
362 .as_ref()
363 .map(|cluster| cluster.node_name.clone()),
364 original_cluster_name
365 );
366
367 Ok(())
368 }
369
370 #[test]
371 fn invalid_listen_address_override_returns_config_load()
372 -> Result<(), Box<dyn std::error::Error>> {
373 let config = sample_config()?;
374 let result = apply_env_overrides_from(
375 config,
376 vec![env_pair("LIMINAL_LISTEN_ADDRESS", "not-a-socket")],
377 );
378
379 assert!(matches!(result, Err(ServerError::ConfigLoad { .. })));
380
381 Ok(())
382 }
383
384 #[test]
385 fn invalid_health_listen_address_override_returns_config_load()
386 -> Result<(), Box<dyn std::error::Error>> {
387 let config = sample_config()?;
388 let result = apply_env_overrides_from(
389 config,
390 vec![env_pair("LIMINAL_HEALTH_LISTEN_ADDRESS", "not-a-socket")],
391 );
392
393 assert!(matches!(result, Err(ServerError::ConfigLoad { .. })));
394
395 Ok(())
396 }
397
398 #[test]
399 fn invalid_drain_timeout_override_returns_config_load() -> Result<(), Box<dyn std::error::Error>>
400 {
401 let config = sample_config()?;
402 let result = apply_env_overrides_from(
403 config,
404 vec![env_pair("LIMINAL_DRAIN_TIMEOUT_MS", "not-a-number")],
405 );
406
407 assert!(matches!(result, Err(ServerError::ConfigLoad { .. })));
408
409 Ok(())
410 }
411
412 #[test]
413 fn cluster_override_without_cluster_section_returns_validation_error()
414 -> Result<(), Box<dyn std::error::Error>> {
415 let mut config = sample_config()?;
416 config.cluster = None;
417 let result = apply_env_overrides_from(
418 config,
419 vec![env_pair("LIMINAL_CLUSTER_NODE_NAME", "node-b")],
420 );
421
422 assert!(matches!(result, Err(ServerError::ConfigValidation { .. })));
423
424 Ok(())
425 }
426
427 #[test]
428 fn file_then_env_then_validate_pipeline_gives_env_precedence()
429 -> Result<(), Box<dyn std::error::Error>> {
430 let toml = r#"
431listen_address = "127.0.0.1:8080"
432health_listen_address = "127.0.0.1:8081"
433drain_timeout_ms = 30000
434persistence_path = "/tmp"
435
436[[channels]]
437name = "orders"
438schema_ref = "schemas/orders.json"
439durable = true
440
441[[routing_rules]]
442source_channel = "orders"
443target_channel = "orders"
444"#;
445 let path = write_temp_config(toml)?;
446 let config = load_from_file(&path)?;
447 let config = apply_env_overrides_from(
448 config,
449 vec![env_pair("LIMINAL_LISTEN_ADDRESS", "0.0.0.0:9090")],
450 )?;
451 validate(&config)?;
452 remove_temp_file(&path)?;
453
454 assert_eq!(config.listen_address, socket("0.0.0.0:9090")?);
455
456 Ok(())
457 }
458}