liminal_server/config/
validation.rs1use std::collections::{BTreeMap, BTreeSet};
2
3use crate::ServerError;
4
5use super::types::ServerConfig;
6
7pub fn validate(config: &ServerConfig) -> Result<(), ServerError> {
18 let mut errors = Vec::new();
19
20 validate_listen_address(config, &mut errors);
21 validate_health_listen_address(config, &mut errors);
22 validate_drain_timeout(config, &mut errors);
23 validate_channels(config, &mut errors);
24 validate_routing_rules(config, &mut errors);
25 validate_persistence_path(config, &mut errors);
26 validate_cluster(config, &mut errors);
27
28 if errors.is_empty() {
29 Ok(())
30 } else {
31 Err(ServerError::ConfigValidation {
32 message: errors.join("; "),
33 })
34 }
35}
36
37fn validate_listen_address(config: &ServerConfig, errors: &mut Vec<String>) {
38 if config.listen_address.port() == 0 {
39 errors.push("listen_address: port must be non-zero".to_owned());
40 }
41}
42
43fn validate_health_listen_address(config: &ServerConfig, errors: &mut Vec<String>) {
44 if config.health_listen_address.port() == 0 {
45 errors.push("health_listen_address: port must be non-zero".to_owned());
46 }
47
48 if config.health_listen_address == config.listen_address {
49 errors.push(
50 "health_listen_address: must differ from listen_address for probe isolation".to_owned(),
51 );
52 } else if config.health_listen_address.port() == config.listen_address.port() {
53 errors.push(
54 "health_listen_address: port must differ from listen_address port for probe isolation"
55 .to_owned(),
56 );
57 }
58}
59
60fn validate_drain_timeout(config: &ServerConfig, errors: &mut Vec<String>) {
61 if config.drain_timeout_ms == 0 {
62 errors.push("drain_timeout_ms: must be greater than zero".to_owned());
63 }
64}
65
66fn validate_channels(config: &ServerConfig, errors: &mut Vec<String>) {
67 let mut seen = BTreeSet::new();
68 let mut duplicates = BTreeSet::new();
69
70 for channel in &config.channels {
71 let name = channel.name.trim();
72 if name.is_empty() {
73 errors.push("channels.name: channel name must not be empty".to_owned());
74 continue;
75 }
76
77 if !seen.insert(name.to_owned()) {
78 duplicates.insert(name.to_owned());
79 }
80 }
81
82 if !duplicates.is_empty() {
83 let names = duplicates.into_iter().collect::<Vec<_>>().join(", ");
84 errors.push(format!("channels.name: duplicate channel names: {names}"));
85 }
86}
87
88fn validate_routing_rules(config: &ServerConfig, errors: &mut Vec<String>) {
89 let channel_names = config
90 .channels
91 .iter()
92 .map(|channel| channel.name.as_str())
93 .collect::<BTreeSet<_>>();
94
95 for (index, rule) in config.routing_rules.iter().enumerate() {
96 let source = rule.source_channel.trim();
97 if source.is_empty() {
98 errors.push(format!(
99 "routing_rules[{index}].source_channel: source channel must not be empty"
100 ));
101 } else if !channel_names.contains(source) {
102 errors.push(format!(
103 "routing_rules[{index}].source_channel: unknown channel '{source}'"
104 ));
105 }
106
107 let target = rule.target_channel.trim();
108 if target.is_empty() {
109 errors.push(format!(
110 "routing_rules[{index}].target_channel: target channel must not be empty"
111 ));
112 } else if !channel_names.contains(target) {
113 errors.push(format!(
114 "routing_rules[{index}].target_channel: unknown channel '{target}'"
115 ));
116 }
117 }
118}
119
120fn validate_persistence_path(config: &ServerConfig, errors: &mut Vec<String>) {
121 let Some(path) = config.persistence_path.as_deref() else {
122 return;
123 };
124
125 match std::fs::metadata(path) {
126 Ok(metadata) => {
127 if !metadata.is_dir() {
128 errors.push(format!(
129 "persistence_path '{}': path must be an existing directory",
130 path.display()
131 ));
132 } else if metadata.permissions().readonly() {
133 errors.push(format!(
134 "persistence_path '{}': path is not writable",
135 path.display()
136 ));
137 }
138 }
139 Err(error) => {
140 errors.push(format!(
141 "persistence_path '{}': path is unreachable: {error}",
142 path.display()
143 ));
144 }
145 }
146}
147
148fn validate_cluster(config: &ServerConfig, errors: &mut Vec<String>) {
149 let Some(cluster) = config.cluster.as_ref() else {
150 return;
151 };
152
153 if cluster.node_name.trim().is_empty() {
154 errors.push("cluster.node_name: node name must not be empty".to_owned());
155 }
156
157 if cluster.cookie.is_empty() {
158 errors.push("cluster.cookie: distribution cookie must not be empty".to_owned());
159 }
160
161 if cluster.listen_address.port() == 0 {
162 errors.push("cluster.listen_address: distribution port must be non-zero".to_owned());
163 }
164
165 if cluster.listen_address == config.listen_address {
166 errors.push(
167 "cluster.listen_address: distribution port must differ from the client listen_address"
168 .to_owned(),
169 );
170 }
171
172 let mut seed_node_counts = BTreeMap::new();
173 for (index, seed_node) in cluster.seed_nodes.iter().enumerate() {
174 if seed_node.port() == 0 {
175 errors.push(format!(
176 "cluster.seed_nodes[{index}]: seed node port must be non-zero"
177 ));
178 }
179 seed_node_counts
180 .entry(seed_node.to_string())
181 .and_modify(|count| *count += 1)
182 .or_insert(1_usize);
183 }
184
185 let duplicates = seed_node_counts
186 .into_iter()
187 .filter_map(|(seed_node, count)| (count > 1).then_some(seed_node))
188 .collect::<Vec<_>>();
189
190 if !duplicates.is_empty() {
191 errors.push(format!(
192 "cluster.seed_nodes: duplicate seed nodes: {}",
193 duplicates.join(", ")
194 ));
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use std::fs;
201 use std::net::SocketAddr;
202 use std::path::PathBuf;
203 use std::sync::atomic::{AtomicU64, Ordering};
204
205 use crate::ServerError;
206
207 use super::validate;
208 use crate::config::types::{ChannelDef, ClusterConfig, RoutingRuleDef, ServerConfig};
209
210 static NEXT_TEMP_DIR_ID: AtomicU64 = AtomicU64::new(0);
211
212 fn socket(address: &str) -> Result<SocketAddr, Box<dyn std::error::Error>> {
213 Ok(address.parse()?)
214 }
215
216 fn sample_config() -> Result<ServerConfig, Box<dyn std::error::Error>> {
217 Ok(ServerConfig {
218 listen_address: socket("127.0.0.1:8080")?,
219 health_listen_address: socket("127.0.0.1:8081")?,
220 drain_timeout_ms: 30_000,
221 channels: vec![ChannelDef {
222 name: "orders".to_owned(),
223 schema_ref: "schemas/orders.json".to_owned(),
224 durable: true,
225 }],
226 routing_rules: vec![RoutingRuleDef {
227 source_channel: "orders".to_owned(),
228 target_channel: "orders".to_owned(),
229 predicate: None,
230 }],
231 persistence_path: None,
232 cluster: Some(ClusterConfig {
233 node_name: "node-a".to_owned(),
234 listen_address: socket("127.0.0.1:9000")?,
235 seed_nodes: vec![socket("127.0.0.1:9001")?],
236 cookie: "test-cookie".to_owned(),
237 }),
238 })
239 }
240
241 fn unique_temp_dir(label: &str) -> PathBuf {
242 let id = NEXT_TEMP_DIR_ID.fetch_add(1, Ordering::Relaxed);
243 std::env::temp_dir().join(format!(
244 "liminal-server-validation-{label}-{}-{id}",
245 std::process::id()
246 ))
247 }
248
249 fn config_validation_message(result: Result<(), ServerError>) -> String {
250 let Err(ServerError::ConfigValidation { message }) = result else {
251 return String::new();
252 };
253 message
254 }
255
256 #[test]
257 fn valid_config_passes_validation() -> Result<(), Box<dyn std::error::Error>> {
258 let config = sample_config()?;
259
260 validate(&config)?;
261
262 Ok(())
263 }
264
265 #[test]
266 fn invalid_listen_address_reports_field_name() -> Result<(), Box<dyn std::error::Error>> {
267 let mut config = sample_config()?;
268 config.listen_address = socket("127.0.0.1:0")?;
269
270 let message = config_validation_message(validate(&config));
271
272 assert!(message.contains("listen_address"));
273 assert!(message.contains("port"));
274
275 Ok(())
276 }
277
278 #[test]
279 fn invalid_health_listen_address_reports_field_name() -> Result<(), Box<dyn std::error::Error>>
280 {
281 let mut config = sample_config()?;
282 config.health_listen_address = socket("127.0.0.1:0")?;
283
284 let message = config_validation_message(validate(&config));
285
286 assert!(message.contains("health_listen_address"));
287 assert!(message.contains("port"));
288
289 Ok(())
290 }
291
292 #[test]
293 fn matching_health_and_main_listen_addresses_are_rejected()
294 -> Result<(), Box<dyn std::error::Error>> {
295 let mut config = sample_config()?;
296 config.health_listen_address = config.listen_address;
297
298 let message = config_validation_message(validate(&config));
299
300 assert!(message.contains("health_listen_address"));
301 assert!(message.contains("listen_address"));
302
303 Ok(())
304 }
305
306 #[test]
307 fn matching_health_and_main_listen_ports_are_rejected() -> Result<(), Box<dyn std::error::Error>>
308 {
309 let mut config = sample_config()?;
310 config.health_listen_address = socket("0.0.0.0:8080")?;
311
312 let message = config_validation_message(validate(&config));
313
314 assert!(message.contains("health_listen_address"));
315 assert!(message.contains("port"));
316
317 Ok(())
318 }
319
320 #[test]
321 fn zero_drain_timeout_is_rejected() -> Result<(), Box<dyn std::error::Error>> {
322 let mut config = sample_config()?;
323 config.drain_timeout_ms = 0;
324
325 let message = config_validation_message(validate(&config));
326
327 assert!(message.contains("drain_timeout_ms"));
328 assert!(message.contains("greater than zero"));
329
330 Ok(())
331 }
332
333 #[test]
334 fn duplicate_channel_names_are_listed() -> Result<(), Box<dyn std::error::Error>> {
335 let mut config = sample_config()?;
336 config.channels.push(ChannelDef {
337 name: "orders".to_owned(),
338 schema_ref: "schemas/orders-v2.json".to_owned(),
339 durable: false,
340 });
341
342 let message = config_validation_message(validate(&config));
343
344 assert!(message.contains("duplicate"));
345 assert!(message.contains("orders"));
346
347 Ok(())
348 }
349
350 #[test]
351 fn unreachable_persistence_path_reports_path() -> Result<(), Box<dyn std::error::Error>> {
352 let mut config = sample_config()?;
353 let path = unique_temp_dir("missing");
354 config.persistence_path = Some(path.clone());
355
356 let message = config_validation_message(validate(&config));
357
358 assert!(message.contains("persistence_path"));
359 assert!(message.contains(&path.display().to_string()));
360
361 Ok(())
362 }
363
364 #[test]
365 fn file_persistence_path_is_rejected() -> Result<(), Box<dyn std::error::Error>> {
366 let mut config = sample_config()?;
367 let path = unique_temp_dir("file");
368 fs::write(&path, "not a directory")?;
369 config.persistence_path = Some(path.clone());
370
371 let message = config_validation_message(validate(&config));
372 fs::remove_file(&path)?;
373
374 assert!(message.contains("persistence_path"));
375 assert!(message.contains("directory"));
376
377 Ok(())
378 }
379
380 #[test]
381 fn multiple_validation_errors_are_reported_together() -> Result<(), Box<dyn std::error::Error>>
382 {
383 let mut config = sample_config()?;
384 let missing_path = unique_temp_dir("multi-missing");
385 config.listen_address = socket("127.0.0.1:0")?;
386 config.channels.push(ChannelDef {
387 name: "orders".to_owned(),
388 schema_ref: "schemas/orders-v2.json".to_owned(),
389 durable: false,
390 });
391 config.persistence_path = Some(missing_path.clone());
392
393 let message = config_validation_message(validate(&config));
394
395 assert!(message.contains("listen_address"));
396 assert!(message.contains("duplicate channel names: orders"));
397 assert!(message.contains(&missing_path.display().to_string()));
398
399 Ok(())
400 }
401
402 #[test]
403 fn routing_rules_reference_configured_channels() -> Result<(), Box<dyn std::error::Error>> {
404 let mut config = sample_config()?;
405 config.routing_rules[0].target_channel = "unknown".to_owned();
406
407 let message = config_validation_message(validate(&config));
408
409 assert!(message.contains("routing_rules[0].target_channel"));
410 assert!(message.contains("unknown"));
411
412 Ok(())
413 }
414}