Skip to main content

liminal_server/config/
validation.rs

1use std::collections::{BTreeMap, BTreeSet};
2
3use crate::ServerError;
4
5use super::types::ServerConfig;
6
7/// Validates a fully loaded server configuration before startup.
8///
9/// Validation is intentionally limited to deterministic semantic checks and
10/// filesystem metadata inspection. It does not bind sockets, connect to peers,
11/// or perform any other network I/O.
12///
13/// # Errors
14///
15/// Returns [`ServerError::ConfigValidation`] containing all discovered validation
16/// errors when the configuration is not safe to use for startup.
17pub fn validate(config: &ServerConfig) -> Result<(), ServerError> {
18    let mut errors = Vec::new();
19
20    validate_listen_address(config, &mut errors);
21    validate_health_listen_address(config, &mut errors);
22    validate_drain_timeout(config, &mut errors);
23    validate_channels(config, &mut errors);
24    validate_routing_rules(config, &mut errors);
25    validate_persistence_path(config, &mut errors);
26    validate_cluster(config, &mut errors);
27
28    if errors.is_empty() {
29        Ok(())
30    } else {
31        Err(ServerError::ConfigValidation {
32            message: errors.join("; "),
33        })
34    }
35}
36
37fn validate_listen_address(config: &ServerConfig, errors: &mut Vec<String>) {
38    if config.listen_address.port() == 0 {
39        errors.push("listen_address: port must be non-zero".to_owned());
40    }
41}
42
43fn validate_health_listen_address(config: &ServerConfig, errors: &mut Vec<String>) {
44    if config.health_listen_address.port() == 0 {
45        errors.push("health_listen_address: port must be non-zero".to_owned());
46    }
47
48    if config.health_listen_address == config.listen_address {
49        errors.push(
50            "health_listen_address: must differ from listen_address for probe isolation".to_owned(),
51        );
52    } else if config.health_listen_address.port() == config.listen_address.port() {
53        errors.push(
54            "health_listen_address: port must differ from listen_address port for probe isolation"
55                .to_owned(),
56        );
57    }
58}
59
60fn validate_drain_timeout(config: &ServerConfig, errors: &mut Vec<String>) {
61    if config.drain_timeout_ms == 0 {
62        errors.push("drain_timeout_ms: must be greater than zero".to_owned());
63    }
64}
65
66fn validate_channels(config: &ServerConfig, errors: &mut Vec<String>) {
67    let mut seen = BTreeSet::new();
68    let mut duplicates = BTreeSet::new();
69
70    for channel in &config.channels {
71        let name = channel.name.trim();
72        if name.is_empty() {
73            errors.push("channels.name: channel name must not be empty".to_owned());
74            continue;
75        }
76
77        if !seen.insert(name.to_owned()) {
78            duplicates.insert(name.to_owned());
79        }
80    }
81
82    if !duplicates.is_empty() {
83        let names = duplicates.into_iter().collect::<Vec<_>>().join(", ");
84        errors.push(format!("channels.name: duplicate channel names: {names}"));
85    }
86}
87
88fn validate_routing_rules(config: &ServerConfig, errors: &mut Vec<String>) {
89    let channel_names = config
90        .channels
91        .iter()
92        .map(|channel| channel.name.as_str())
93        .collect::<BTreeSet<_>>();
94
95    for (index, rule) in config.routing_rules.iter().enumerate() {
96        let source = rule.source_channel.trim();
97        if source.is_empty() {
98            errors.push(format!(
99                "routing_rules[{index}].source_channel: source channel must not be empty"
100            ));
101        } else if !channel_names.contains(source) {
102            errors.push(format!(
103                "routing_rules[{index}].source_channel: unknown channel '{source}'"
104            ));
105        }
106
107        let target = rule.target_channel.trim();
108        if target.is_empty() {
109            errors.push(format!(
110                "routing_rules[{index}].target_channel: target channel must not be empty"
111            ));
112        } else if !channel_names.contains(target) {
113            errors.push(format!(
114                "routing_rules[{index}].target_channel: unknown channel '{target}'"
115            ));
116        }
117    }
118}
119
120fn validate_persistence_path(config: &ServerConfig, errors: &mut Vec<String>) {
121    let Some(path) = config.persistence_path.as_deref() else {
122        return;
123    };
124
125    match std::fs::metadata(path) {
126        Ok(metadata) => {
127            if !metadata.is_dir() {
128                errors.push(format!(
129                    "persistence_path '{}': path must be an existing directory",
130                    path.display()
131                ));
132            } else if metadata.permissions().readonly() {
133                errors.push(format!(
134                    "persistence_path '{}': path is not writable",
135                    path.display()
136                ));
137            }
138        }
139        Err(error) => {
140            errors.push(format!(
141                "persistence_path '{}': path is unreachable: {error}",
142                path.display()
143            ));
144        }
145    }
146}
147
148fn validate_cluster(config: &ServerConfig, errors: &mut Vec<String>) {
149    let Some(cluster) = config.cluster.as_ref() else {
150        return;
151    };
152
153    if cluster.node_name.trim().is_empty() {
154        errors.push("cluster.node_name: node name must not be empty".to_owned());
155    }
156
157    if cluster.cookie.is_empty() {
158        errors.push("cluster.cookie: distribution cookie must not be empty".to_owned());
159    }
160
161    if cluster.listen_address.port() == 0 {
162        errors.push("cluster.listen_address: distribution port must be non-zero".to_owned());
163    }
164
165    if cluster.listen_address == config.listen_address {
166        errors.push(
167            "cluster.listen_address: distribution port must differ from the client listen_address"
168                .to_owned(),
169        );
170    }
171
172    let mut seed_node_counts = BTreeMap::new();
173    for (index, seed_node) in cluster.seed_nodes.iter().enumerate() {
174        if seed_node.port() == 0 {
175            errors.push(format!(
176                "cluster.seed_nodes[{index}]: seed node port must be non-zero"
177            ));
178        }
179        seed_node_counts
180            .entry(seed_node.to_string())
181            .and_modify(|count| *count += 1)
182            .or_insert(1_usize);
183    }
184
185    let duplicates = seed_node_counts
186        .into_iter()
187        .filter_map(|(seed_node, count)| (count > 1).then_some(seed_node))
188        .collect::<Vec<_>>();
189
190    if !duplicates.is_empty() {
191        errors.push(format!(
192            "cluster.seed_nodes: duplicate seed nodes: {}",
193            duplicates.join(", ")
194        ));
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use std::fs;
201    use std::net::SocketAddr;
202    use std::path::PathBuf;
203    use std::sync::atomic::{AtomicU64, Ordering};
204
205    use crate::ServerError;
206
207    use super::validate;
208    use crate::config::types::{ChannelDef, ClusterConfig, RoutingRuleDef, ServerConfig};
209
210    static NEXT_TEMP_DIR_ID: AtomicU64 = AtomicU64::new(0);
211
212    fn socket(address: &str) -> Result<SocketAddr, Box<dyn std::error::Error>> {
213        Ok(address.parse()?)
214    }
215
216    fn sample_config() -> Result<ServerConfig, Box<dyn std::error::Error>> {
217        Ok(ServerConfig {
218            listen_address: socket("127.0.0.1:8080")?,
219            health_listen_address: socket("127.0.0.1:8081")?,
220            drain_timeout_ms: 30_000,
221            channels: vec![ChannelDef {
222                name: "orders".to_owned(),
223                schema_ref: "schemas/orders.json".to_owned(),
224                durable: true,
225            }],
226            routing_rules: vec![RoutingRuleDef {
227                source_channel: "orders".to_owned(),
228                target_channel: "orders".to_owned(),
229                predicate: None,
230            }],
231            persistence_path: None,
232            cluster: Some(ClusterConfig {
233                node_name: "node-a".to_owned(),
234                listen_address: socket("127.0.0.1:9000")?,
235                seed_nodes: vec![socket("127.0.0.1:9001")?],
236                cookie: "test-cookie".to_owned(),
237            }),
238        })
239    }
240
241    fn unique_temp_dir(label: &str) -> PathBuf {
242        let id = NEXT_TEMP_DIR_ID.fetch_add(1, Ordering::Relaxed);
243        std::env::temp_dir().join(format!(
244            "liminal-server-validation-{label}-{}-{id}",
245            std::process::id()
246        ))
247    }
248
249    fn config_validation_message(result: Result<(), ServerError>) -> String {
250        let Err(ServerError::ConfigValidation { message }) = result else {
251            return String::new();
252        };
253        message
254    }
255
256    #[test]
257    fn valid_config_passes_validation() -> Result<(), Box<dyn std::error::Error>> {
258        let config = sample_config()?;
259
260        validate(&config)?;
261
262        Ok(())
263    }
264
265    #[test]
266    fn invalid_listen_address_reports_field_name() -> Result<(), Box<dyn std::error::Error>> {
267        let mut config = sample_config()?;
268        config.listen_address = socket("127.0.0.1:0")?;
269
270        let message = config_validation_message(validate(&config));
271
272        assert!(message.contains("listen_address"));
273        assert!(message.contains("port"));
274
275        Ok(())
276    }
277
278    #[test]
279    fn invalid_health_listen_address_reports_field_name() -> Result<(), Box<dyn std::error::Error>>
280    {
281        let mut config = sample_config()?;
282        config.health_listen_address = socket("127.0.0.1:0")?;
283
284        let message = config_validation_message(validate(&config));
285
286        assert!(message.contains("health_listen_address"));
287        assert!(message.contains("port"));
288
289        Ok(())
290    }
291
292    #[test]
293    fn matching_health_and_main_listen_addresses_are_rejected()
294    -> Result<(), Box<dyn std::error::Error>> {
295        let mut config = sample_config()?;
296        config.health_listen_address = config.listen_address;
297
298        let message = config_validation_message(validate(&config));
299
300        assert!(message.contains("health_listen_address"));
301        assert!(message.contains("listen_address"));
302
303        Ok(())
304    }
305
306    #[test]
307    fn matching_health_and_main_listen_ports_are_rejected() -> Result<(), Box<dyn std::error::Error>>
308    {
309        let mut config = sample_config()?;
310        config.health_listen_address = socket("0.0.0.0:8080")?;
311
312        let message = config_validation_message(validate(&config));
313
314        assert!(message.contains("health_listen_address"));
315        assert!(message.contains("port"));
316
317        Ok(())
318    }
319
320    #[test]
321    fn zero_drain_timeout_is_rejected() -> Result<(), Box<dyn std::error::Error>> {
322        let mut config = sample_config()?;
323        config.drain_timeout_ms = 0;
324
325        let message = config_validation_message(validate(&config));
326
327        assert!(message.contains("drain_timeout_ms"));
328        assert!(message.contains("greater than zero"));
329
330        Ok(())
331    }
332
333    #[test]
334    fn duplicate_channel_names_are_listed() -> Result<(), Box<dyn std::error::Error>> {
335        let mut config = sample_config()?;
336        config.channels.push(ChannelDef {
337            name: "orders".to_owned(),
338            schema_ref: "schemas/orders-v2.json".to_owned(),
339            durable: false,
340        });
341
342        let message = config_validation_message(validate(&config));
343
344        assert!(message.contains("duplicate"));
345        assert!(message.contains("orders"));
346
347        Ok(())
348    }
349
350    #[test]
351    fn unreachable_persistence_path_reports_path() -> Result<(), Box<dyn std::error::Error>> {
352        let mut config = sample_config()?;
353        let path = unique_temp_dir("missing");
354        config.persistence_path = Some(path.clone());
355
356        let message = config_validation_message(validate(&config));
357
358        assert!(message.contains("persistence_path"));
359        assert!(message.contains(&path.display().to_string()));
360
361        Ok(())
362    }
363
364    #[test]
365    fn file_persistence_path_is_rejected() -> Result<(), Box<dyn std::error::Error>> {
366        let mut config = sample_config()?;
367        let path = unique_temp_dir("file");
368        fs::write(&path, "not a directory")?;
369        config.persistence_path = Some(path.clone());
370
371        let message = config_validation_message(validate(&config));
372        fs::remove_file(&path)?;
373
374        assert!(message.contains("persistence_path"));
375        assert!(message.contains("directory"));
376
377        Ok(())
378    }
379
380    #[test]
381    fn multiple_validation_errors_are_reported_together() -> Result<(), Box<dyn std::error::Error>>
382    {
383        let mut config = sample_config()?;
384        let missing_path = unique_temp_dir("multi-missing");
385        config.listen_address = socket("127.0.0.1:0")?;
386        config.channels.push(ChannelDef {
387            name: "orders".to_owned(),
388            schema_ref: "schemas/orders-v2.json".to_owned(),
389            durable: false,
390        });
391        config.persistence_path = Some(missing_path.clone());
392
393        let message = config_validation_message(validate(&config));
394
395        assert!(message.contains("listen_address"));
396        assert!(message.contains("duplicate channel names: orders"));
397        assert!(message.contains(&missing_path.display().to_string()));
398
399        Ok(())
400    }
401
402    #[test]
403    fn routing_rules_reference_configured_channels() -> Result<(), Box<dyn std::error::Error>> {
404        let mut config = sample_config()?;
405        config.routing_rules[0].target_channel = "unknown".to_owned();
406
407        let message = config_validation_message(validate(&config));
408
409        assert!(message.contains("routing_rules[0].target_channel"));
410        assert!(message.contains("unknown"));
411
412        Ok(())
413    }
414}