Skip to main content

rrq_config/
config.rs

1use std::path::Path;
2use std::time::Duration;
3
4use anyhow::{Context, Result};
5use serde_json::{Map, Value};
6
7use crate::queue::normalize_queue_name;
8use crate::settings::{RRQSettings, RunnerManagementMode};
9use crate::tcp_socket::parse_tcp_socket_with_allowed_hosts;
10
11pub const DEFAULT_CONFIG_FILENAME: &str = "rrq.toml";
12pub const ENV_CONFIG_KEY: &str = "RRQ_CONFIG";
13
14#[must_use]
15pub fn resolve_config_source(config_path: Option<&str>) -> (Option<String>, String) {
16    if let Some(path) = config_path {
17        return (Some(path.to_string()), "--config parameter".to_string());
18    }
19
20    if let Ok(env_path) = std::env::var(ENV_CONFIG_KEY)
21        && !env_path.is_empty()
22    {
23        return (Some(env_path), format!("{ENV_CONFIG_KEY} env var"));
24    }
25
26    let default_path = Path::new(DEFAULT_CONFIG_FILENAME);
27    if default_path.is_file() {
28        return (
29            Some(default_path.to_string_lossy().to_string()),
30            format!("{DEFAULT_CONFIG_FILENAME} in cwd"),
31        );
32    }
33
34    (None, "not found".to_string())
35}
36
37pub fn load_toml_settings(config_path: Option<&str>) -> Result<RRQSettings> {
38    load_toml_settings_with_runner_mode(config_path, None)
39}
40
41pub fn load_toml_settings_with_runner_mode(
42    config_path: Option<&str>,
43    runner_mode_override: Option<RunnerManagementMode>,
44) -> Result<RRQSettings> {
45    dotenvy::dotenv().ok();
46
47    let (path, _) = resolve_config_source(config_path);
48    let path = path.ok_or_else(|| {
49        anyhow::anyhow!("RRQ config not found. Provide --config, set RRQ_CONFIG, or add rrq.toml.")
50    })?;
51
52    let payload = std::fs::read_to_string(&path)
53        .with_context(|| format!("failed to read config at {path}"))?;
54    let toml_value: toml::Value =
55        toml::from_str(&payload).with_context(|| format!("failed to parse TOML at {path}"))?;
56    let mut json_value =
57        serde_json::to_value(toml_value).context("failed to convert TOML to JSON")?;
58
59    json_value = normalize_toml_payload(json_value)?;
60    let env_overrides = env_overrides();
61    let merged = deep_merge(json_value, env_overrides);
62
63    let mut settings: RRQSettings = serde_json::from_value(merged.clone()).map_err(|err| {
64        let hint = diagnose_config_error(&merged, &err);
65        anyhow::anyhow!("invalid RRQ config: {err}{hint}")
66    })?;
67    if let Some(mode) = runner_mode_override {
68        settings.runner_management_mode = mode;
69    }
70    validate_runner_configs(&settings)?;
71    Ok(settings)
72}
73
74fn normalize_toml_payload(mut payload: Value) -> Result<Value> {
75    if let Value::Object(mut map) = payload {
76        if let Some(rrq_value) = map.remove("rrq") {
77            payload = rrq_value;
78        } else {
79            payload = Value::Object(map);
80        }
81    }
82
83    if let Value::Object(mut map) = payload {
84        if let Some(routing) = map.remove("routing") {
85            map.insert("runner_routes".to_string(), routing);
86        }
87        map.remove("worker_concurrency");
88        normalize_queue_fields(&mut map);
89        return Ok(Value::Object(map));
90    }
91
92    Err(anyhow::anyhow!("RRQ config must be a TOML table"))
93}
94
95fn normalize_queue_fields(map: &mut Map<String, Value>) {
96    if let Some(Value::String(queue_name)) = map.get_mut("default_queue_name") {
97        *queue_name = normalize_queue_name(queue_name);
98    }
99
100    if let Some(Value::Object(routes)) = map.get_mut("runner_routes") {
101        let mut normalized = Map::new();
102        for (queue_name, runner_name) in std::mem::take(routes) {
103            normalized.insert(normalize_queue_name(&queue_name), runner_name);
104        }
105        *routes = normalized;
106    }
107}
108
109fn env_overrides() -> Value {
110    let mut payload = Map::new();
111
112    set_env_string(&mut payload, "redis_dsn", "RRQ_REDIS_DSN");
113    set_env_bool(
114        &mut payload,
115        "capture_runner_output",
116        "RRQ_CAPTURE_RUNNER_OUTPUT",
117    );
118
119    Value::Object(payload)
120}
121
122fn set_env_string(map: &mut Map<String, Value>, key: &str, env: &str) {
123    if let Ok(value) = std::env::var(env)
124        && !value.is_empty()
125    {
126        map.insert(key.to_string(), Value::String(value));
127    }
128}
129
130fn set_env_bool(map: &mut Map<String, Value>, key: &str, env: &str) {
131    if let Ok(value) = std::env::var(env)
132        && !value.is_empty()
133    {
134        let b = matches!(value.as_str(), "1" | "true" | "yes");
135        map.insert(key.to_string(), Value::Bool(b));
136    }
137}
138
139fn deep_merge(base: Value, overlay: Value) -> Value {
140    match (base, overlay) {
141        (Value::Object(mut base_map), Value::Object(overlay_map)) => {
142            for (key, value) in overlay_map {
143                let entry = base_map.remove(&key);
144                let merged = match entry {
145                    Some(existing) => deep_merge(existing, value),
146                    None => value,
147                };
148                base_map.insert(key, merged);
149            }
150            Value::Object(base_map)
151        }
152        (_, overlay_value) => overlay_value,
153    }
154}
155
156fn runner_tcp_host_or_default(raw: Option<&str>) -> String {
157    match raw.map(str::trim).filter(|s| !s.is_empty()) {
158        Some(value) => value.to_string(),
159        None => "127.0.0.1".to_string(),
160    }
161}
162
163fn runner_tcp_socket_for_validation(host: &str, port: u16) -> String {
164    if host.contains(':') && !host.starts_with('[') && !host.ends_with(']') {
165        format!("[{host}]:{port}")
166    } else {
167        format!("{host}:{port}")
168    }
169}
170
171fn diagnose_config_error(config: &Value, err: &serde_json::Error) -> String {
172    let err_msg = err.to_string().to_lowercase();
173
174    // Check for unknown field errors in runner configs
175    if err_msg.contains("unknown field")
176        && let Some(runners) = config.get("runners").and_then(|v| v.as_object())
177    {
178        let valid_fields = [
179            "type",
180            "cmd",
181            "pool_size",
182            "max_in_flight",
183            "env",
184            "cwd",
185            "tcp_host",
186            "tcp_port",
187            "allowed_hosts",
188            "response_timeout_seconds",
189        ];
190        for (name, runner) in runners {
191            if let Some(obj) = runner.as_object() {
192                for key in obj.keys() {
193                    if !valid_fields.contains(&key.as_str()) {
194                        return format!(
195                            "\n\nHint: runner '{name}' has unknown field '{key}'. Valid fields are: {}",
196                            valid_fields.join(", ")
197                        );
198                    }
199                }
200            }
201        }
202    }
203
204    // Check for type errors in runner configs
205    if err_msg.contains("invalid type")
206        && let Some(runners) = config.get("runners").and_then(|v| v.as_object())
207    {
208        for (name, runner) in runners {
209            if let Some(obj) = runner.as_object() {
210                if let Some(pool_size) = obj.get("pool_size")
211                    && !pool_size.is_u64()
212                {
213                    return format!(
214                        "\n\nHint: runner '{name}' has invalid pool_size - must be a positive integer, got: {pool_size}"
215                    );
216                }
217                if let Some(max_in_flight) = obj.get("max_in_flight")
218                    && !max_in_flight.is_u64()
219                {
220                    return format!(
221                        "\n\nHint: runner '{name}' has invalid max_in_flight - must be a positive integer, got: {max_in_flight}"
222                    );
223                }
224                if let Some(cmd) = obj.get("cmd")
225                    && !cmd.is_array()
226                {
227                    return format!(
228                        "\n\nHint: runner '{name}' has invalid cmd - must be an array of strings, got: {cmd}"
229                    );
230                }
231            }
232        }
233    }
234
235    String::new()
236}
237
238fn validate_runner_configs(settings: &RRQSettings) -> Result<()> {
239    if settings.default_job_timeout_seconds <= 0 {
240        return Err(anyhow::anyhow!(
241            "default_job_timeout_seconds must be positive"
242        ));
243    }
244    if settings.default_lock_timeout_extension_seconds < 0 {
245        return Err(anyhow::anyhow!(
246            "default_lock_timeout_extension_seconds must be >= 0"
247        ));
248    }
249    settings
250        .default_job_timeout_seconds
251        .checked_add(settings.default_lock_timeout_extension_seconds)
252        .and_then(|sum| sum.checked_mul(1000))
253        .ok_or_else(|| anyhow::anyhow!("provisional claim lock timeout overflow"))?;
254
255    if !settings.runner_shutdown_term_grace_seconds.is_finite()
256        || settings.runner_shutdown_term_grace_seconds < 0.0
257    {
258        return Err(anyhow::anyhow!(
259            "runner_shutdown_term_grace_seconds must be a finite number >= 0"
260        ));
261    }
262    if settings.runner_shutdown_term_grace_seconds > Duration::MAX.as_secs_f64() {
263        return Err(anyhow::anyhow!(
264            "runner_shutdown_term_grace_seconds is too large (max {})",
265            Duration::MAX.as_secs_f64()
266        ));
267    }
268
269    if settings.runners.is_empty() {
270        return Ok(()); // No runners configured is valid (will error at worker startup)
271    }
272
273    let default_pool_size = num_cpus::get();
274    let external_mode = settings.runner_management_mode == RunnerManagementMode::External;
275    for (name, config) in &settings.runners {
276        // Validate pool_size if specified
277        if let Some(pool_size) = config.pool_size
278            && pool_size == 0
279        {
280            return Err(anyhow::anyhow!(
281                "runner '{name}' has invalid pool_size: 0 - must be a positive integer"
282            ));
283        }
284
285        // Validate max_in_flight if specified
286        if let Some(max_in_flight) = config.max_in_flight
287            && max_in_flight == 0
288        {
289            return Err(anyhow::anyhow!(
290                "runner '{name}' has invalid max_in_flight: 0 - must be a positive integer"
291            ));
292        }
293
294        // Check for missing required fields
295        if config.tcp_port.is_none() {
296            return Err(anyhow::anyhow!(
297                "runner '{name}' is missing required field 'tcp_port' (e.g., 9000)"
298            ));
299        }
300
301        if !external_mode && config.cmd.is_none() {
302            return Err(anyhow::anyhow!(
303                "runner '{name}' is missing required field 'cmd' (e.g., [\"rrq-runner\", \"--settings\", \"myapp.settings\"])"
304            ));
305        }
306
307        // Validate tcp host/port and enforce allowlists in external mode
308        if let Some(port) = config.tcp_port {
309            if port == 0 {
310                return Err(anyhow::anyhow!(
311                    "runner '{name}' has invalid tcp_port: 0 - must be in 1..=65535"
312                ));
313            }
314
315            let host = runner_tcp_host_or_default(config.tcp_host.as_deref());
316            if external_mode {
317                let socket = runner_tcp_socket_for_validation(&host, port);
318                parse_tcp_socket_with_allowed_hosts(
319                    &socket,
320                    config.allowed_hosts.as_deref().unwrap_or(&[]),
321                )
322                .with_context(|| format!("runner '{name}' has invalid endpoint '{socket}'"))?;
323            }
324
325            // Validate port range is sufficient for pool_size
326            let pool_size = if external_mode {
327                1
328            } else {
329                config.pool_size.unwrap_or(default_pool_size)
330            };
331            let port_span = u32::try_from(pool_size.saturating_sub(1)).map_err(|_| {
332                anyhow::anyhow!("runner '{name}' pool_size is too large: {pool_size}")
333            })?;
334            let max_port = u32::from(port) + port_span;
335            if max_port > u32::from(u16::MAX) {
336                return Err(anyhow::anyhow!(
337                    "runner '{name}' tcp_port range insufficient for pool_size {pool_size} \
338                    (port {} + {} would exceed 65535)",
339                    port,
340                    pool_size - 1
341                ));
342            }
343        }
344    }
345
346    Ok(())
347}
348
349#[cfg(test)]
350#[allow(unsafe_code)] // env var manipulation in tests
351mod tests {
352    use super::*;
353    use std::fs;
354    use std::sync::{Mutex, OnceLock};
355    use uuid::Uuid;
356
357    static ENV_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
358
359    fn env_lock() -> &'static Mutex<()> {
360        ENV_LOCK.get_or_init(|| Mutex::new(()))
361    }
362
363    #[test]
364    fn load_toml_settings_merges_env_and_normalizes_fields() {
365        let _lock = env_lock().lock().unwrap();
366        unsafe {
367            std::env::set_var("RRQ_REDIS_DSN", "redis://localhost:6379/10");
368        }
369
370        let dir = tempfile::tempdir().unwrap();
371        let path = dir.path().join("rrq.toml");
372        let config = r#"
373        [rrq]
374        redis_dsn = "redis://localhost:6379/9"
375        default_queue_name = "from_toml"
376        [rrq.routing]
377        alpha = "python"
378        "#;
379        fs::write(&path, config).unwrap();
380
381        let settings = load_toml_settings(Some(path.to_str().unwrap())).unwrap();
382        assert_eq!(settings.redis_dsn, "redis://localhost:6379/10");
383        assert_eq!(settings.default_queue_name, "rrq:queue:from_toml");
384        assert_eq!(
385            settings.runner_routes.get("rrq:queue:alpha"),
386            Some(&"python".to_string())
387        );
388        assert!(!settings.runner_routes.contains_key("alpha"));
389    }
390
391    #[test]
392    fn resolve_config_source_prefers_explicit_path() {
393        let (path, source) = resolve_config_source(Some("custom.toml"));
394        assert_eq!(path, Some("custom.toml".to_string()));
395        assert_eq!(source, "--config parameter");
396    }
397
398    #[test]
399    fn resolve_config_source_falls_back_to_env() {
400        let _lock = env_lock().lock().unwrap();
401        let value = format!("rrq-{}.toml", Uuid::new_v4());
402        unsafe {
403            std::env::set_var(ENV_CONFIG_KEY, &value);
404        }
405        let (path, source) = resolve_config_source(None);
406        assert_eq!(path, Some(value));
407        assert!(source.contains(ENV_CONFIG_KEY));
408    }
409
410    #[test]
411    fn validate_runner_configs_accepts_valid_config() {
412        let _lock = env_lock().lock().unwrap();
413        let dir = tempfile::tempdir().unwrap();
414        let path = dir.path().join("rrq.toml");
415        let config = r#"
416        [rrq]
417        default_runner_name = "python"
418        [rrq.runners.python]
419        cmd = ["rrq-runner", "--settings", "myapp.settings"]
420        tcp_port = 9000
421        pool_size = 2
422        max_in_flight = 10
423        "#;
424        fs::write(&path, config).unwrap();
425        let settings = load_toml_settings(Some(path.to_str().unwrap())).unwrap();
426        assert_eq!(settings.runners.len(), 1);
427        assert!(settings.runners.contains_key("python"));
428    }
429
430    #[test]
431    fn validate_runner_configs_rejects_missing_tcp_port() {
432        let _lock = env_lock().lock().unwrap();
433        let dir = tempfile::tempdir().unwrap();
434        let path = dir.path().join("rrq.toml");
435        let config = r#"
436        [rrq]
437        [rrq.runners.python]
438        cmd = ["rrq-runner"]
439        "#;
440        fs::write(&path, config).unwrap();
441        let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
442        assert!(err.to_string().contains("tcp_port"));
443    }
444
445    #[test]
446    fn validate_runner_configs_rejects_missing_cmd() {
447        let _lock = env_lock().lock().unwrap();
448        let dir = tempfile::tempdir().unwrap();
449        let path = dir.path().join("rrq.toml");
450        let config = r"
451        [rrq]
452        [rrq.runners.python]
453        tcp_port = 9000
454        ";
455        fs::write(&path, config).unwrap();
456        let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
457        assert!(err.to_string().contains("cmd"));
458    }
459
460    #[test]
461    fn validate_runner_configs_allows_missing_cmd_in_external_mode() {
462        let _lock = env_lock().lock().unwrap();
463        let dir = tempfile::tempdir().unwrap();
464        let path = dir.path().join("rrq.toml");
465        let config = r#"
466        [rrq]
467        runner_management_mode = "external"
468        [rrq.runners.python]
469        tcp_port = 9000
470        "#;
471        fs::write(&path, config).unwrap();
472        let settings = load_toml_settings(Some(path.to_str().unwrap())).unwrap();
473        assert_eq!(
474            settings.runner_management_mode,
475            RunnerManagementMode::External
476        );
477        assert!(settings.runners.contains_key("python"));
478    }
479
480    #[test]
481    fn load_toml_settings_with_runner_mode_allows_external_override_missing_cmd() {
482        let _lock = env_lock().lock().unwrap();
483        let dir = tempfile::tempdir().unwrap();
484        let path = dir.path().join("rrq.toml");
485        let config = r"
486        [rrq]
487        [rrq.runners.python]
488        tcp_port = 9000
489        ";
490        fs::write(&path, config).unwrap();
491        let settings = load_toml_settings_with_runner_mode(
492            Some(path.to_str().unwrap()),
493            Some(RunnerManagementMode::External),
494        )
495        .unwrap();
496        assert_eq!(
497            settings.runner_management_mode,
498            RunnerManagementMode::External
499        );
500        assert!(settings.runners.contains_key("python"));
501    }
502
503    #[test]
504    fn load_toml_settings_with_runner_mode_managed_override_requires_cmd() {
505        let _lock = env_lock().lock().unwrap();
506        let dir = tempfile::tempdir().unwrap();
507        let path = dir.path().join("rrq.toml");
508        let config = r#"
509        [rrq]
510        runner_management_mode = "external"
511        [rrq.runners.python]
512        tcp_port = 9000
513        "#;
514        fs::write(&path, config).unwrap();
515        let err = load_toml_settings_with_runner_mode(
516            Some(path.to_str().unwrap()),
517            Some(RunnerManagementMode::Managed),
518        )
519        .unwrap_err();
520        assert!(err.to_string().contains("cmd"));
521    }
522
523    #[test]
524    fn validate_runner_configs_rejects_non_loopback_in_external_without_allowlist() {
525        let _lock = env_lock().lock().unwrap();
526        let dir = tempfile::tempdir().unwrap();
527        let path = dir.path().join("rrq.toml");
528        let config = r#"
529        [rrq]
530        runner_management_mode = "external"
531        [rrq.runners.python]
532        tcp_host = "10.0.0.1"
533        tcp_port = 9000
534        "#;
535        fs::write(&path, config).unwrap();
536        let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
537        // Error chain includes "loopback" in the cause
538        let full_err = format!("{err:?}");
539        assert!(full_err.contains("loopback"), "error was: {full_err}");
540    }
541
542    #[test]
543    fn validate_runner_configs_allows_non_loopback_when_allowlisted_in_external_mode() {
544        let _lock = env_lock().lock().unwrap();
545        let dir = tempfile::tempdir().unwrap();
546        let path = dir.path().join("rrq.toml");
547        let config = r#"
548        [rrq]
549        runner_management_mode = "external"
550        [rrq.runners.python]
551        tcp_host = "10.0.0.1"
552        tcp_port = 9000
553        allowed_hosts = ["10.0.0.1"]
554        "#;
555        fs::write(&path, config).unwrap();
556        let settings = load_toml_settings(Some(path.to_str().unwrap())).unwrap();
557        assert!(settings.runners.contains_key("python"));
558    }
559
560    #[test]
561    fn validate_runner_configs_allows_non_loopback_allowlist_in_managed_mode() {
562        let _lock = env_lock().lock().unwrap();
563        let dir = tempfile::tempdir().unwrap();
564        let path = dir.path().join("rrq.toml");
565        let config = r#"
566        [rrq]
567        [rrq.runners.python]
568        cmd = ["rrq-runner"]
569        tcp_host = "10.0.0.1"
570        tcp_port = 9000
571        allowed_hosts = ["10.0.0.1"]
572        "#;
573        fs::write(&path, config).unwrap();
574        let settings = load_toml_settings(Some(path.to_str().unwrap())).unwrap();
575        assert!(settings.runners.contains_key("python"));
576    }
577
578    #[test]
579    fn validate_runner_configs_rejects_zero_pool_size() {
580        let _lock = env_lock().lock().unwrap();
581        let dir = tempfile::tempdir().unwrap();
582        let path = dir.path().join("rrq.toml");
583        let config = r#"
584        [rrq]
585        [rrq.runners.python]
586        cmd = ["rrq-runner"]
587        tcp_port = 9000
588        pool_size = 0
589        "#;
590        fs::write(&path, config).unwrap();
591        let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
592        assert!(err.to_string().contains("pool_size"));
593    }
594
595    #[test]
596    fn validate_runner_configs_rejects_zero_max_in_flight() {
597        let _lock = env_lock().lock().unwrap();
598        let dir = tempfile::tempdir().unwrap();
599        let path = dir.path().join("rrq.toml");
600        let config = r#"
601        [rrq]
602        [rrq.runners.python]
603        cmd = ["rrq-runner"]
604        tcp_port = 9000
605        max_in_flight = 0
606        "#;
607        fs::write(&path, config).unwrap();
608        let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
609        assert!(err.to_string().contains("max_in_flight"));
610    }
611
612    #[test]
613    fn validate_runner_configs_allows_mismatched_default_runner() {
614        let _lock = env_lock().lock().unwrap();
615        let dir = tempfile::tempdir().unwrap();
616        let path = dir.path().join("rrq.toml");
617        let config = r#"
618        [rrq]
619        default_runner_name = "node"
620        [rrq.runners.python]
621        cmd = ["rrq-runner"]
622        tcp_port = 9000
623        "#;
624        fs::write(&path, config).unwrap();
625        let settings = load_toml_settings(Some(path.to_str().unwrap())).unwrap();
626        assert_eq!(settings.default_runner_name, "node");
627        assert!(settings.runners.contains_key("python"));
628    }
629
630    #[test]
631    fn validate_runner_configs_rejects_unknown_field() {
632        let _lock = env_lock().lock().unwrap();
633        let dir = tempfile::tempdir().unwrap();
634        let path = dir.path().join("rrq.toml");
635        let config = r#"
636        [rrq]
637        [rrq.runners.python]
638        cmd = ["rrq-runner"]
639        tcp_port = 9000
640        pool_siz = 4
641        "#;
642        fs::write(&path, config).unwrap();
643        let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
644        assert!(err.to_string().contains("unknown field"));
645    }
646
647    #[test]
648    fn validate_runner_configs_rejects_port_overflow() {
649        let _lock = env_lock().lock().unwrap();
650        let dir = tempfile::tempdir().unwrap();
651        let path = dir.path().join("rrq.toml");
652        let config = r#"
653        [rrq]
654        [rrq.runners.python]
655        cmd = ["rrq-runner"]
656        tcp_port = 65535
657        pool_size = 2
658        "#;
659        fs::write(&path, config).unwrap();
660        let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
661        assert!(err.to_string().contains("port range"));
662    }
663
664    #[test]
665    fn validate_runner_configs_uses_default_pool_size_for_port_range() {
666        let _lock = env_lock().lock().unwrap();
667        let dir = tempfile::tempdir().unwrap();
668        let path = dir.path().join("rrq.toml");
669        let config = r#"
670        [rrq]
671        [rrq.runners.python]
672        cmd = ["rrq-runner"]
673        tcp_port = 65535
674        "#;
675        fs::write(&path, config).unwrap();
676        let result = load_toml_settings(Some(path.to_str().unwrap()));
677        let default_pool_size = num_cpus::get();
678        if default_pool_size > 1 {
679            let err = result.unwrap_err();
680            assert!(err.to_string().contains("port range"));
681        } else {
682            assert!(result.is_ok());
683        }
684    }
685
686    #[test]
687    fn validate_runner_configs_rejects_negative_shutdown_term_grace() {
688        let _lock = env_lock().lock().unwrap();
689        let dir = tempfile::tempdir().unwrap();
690        let path = dir.path().join("rrq.toml");
691        let config = r#"
692        [rrq]
693        runner_shutdown_term_grace_seconds = -1
694        [rrq.runners.python]
695        cmd = ["rrq-runner"]
696        tcp_port = 9000
697        "#;
698        fs::write(&path, config).unwrap();
699        let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
700        assert!(
701            err.to_string()
702                .contains("runner_shutdown_term_grace_seconds")
703        );
704    }
705
706    #[test]
707    fn validate_runner_configs_rejects_oversized_shutdown_term_grace() {
708        let _lock = env_lock().lock().unwrap();
709        let dir = tempfile::tempdir().unwrap();
710        let path = dir.path().join("rrq.toml");
711        let config = r#"
712        [rrq]
713        runner_shutdown_term_grace_seconds = 1e100
714        [rrq.runners.python]
715        cmd = ["rrq-runner"]
716        tcp_port = 9000
717        "#;
718        fs::write(&path, config).unwrap();
719        let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
720        assert!(err.to_string().contains("too large"));
721    }
722
723    #[test]
724    fn validate_runner_configs_rejects_non_positive_default_job_timeout() {
725        let _lock = env_lock().lock().unwrap();
726        let dir = tempfile::tempdir().unwrap();
727        let path = dir.path().join("rrq.toml");
728        let config = r"
729        [rrq]
730        default_job_timeout_seconds = 0
731        ";
732        fs::write(&path, config).unwrap();
733        let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
734        assert!(
735            err.to_string()
736                .contains("default_job_timeout_seconds must be positive")
737        );
738    }
739
740    #[test]
741    fn validate_runner_configs_rejects_negative_default_lock_timeout_extension() {
742        let _lock = env_lock().lock().unwrap();
743        let dir = tempfile::tempdir().unwrap();
744        let path = dir.path().join("rrq.toml");
745        let config = r"
746        [rrq]
747        default_lock_timeout_extension_seconds = -1
748        ";
749        fs::write(&path, config).unwrap();
750        let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
751        assert!(
752            err.to_string()
753                .contains("default_lock_timeout_extension_seconds must be >= 0")
754        );
755    }
756
757    #[test]
758    fn validate_runner_configs_rejects_overflowing_provisional_claim_lock_timeout() {
759        let _lock = env_lock().lock().unwrap();
760        let dir = tempfile::tempdir().unwrap();
761        let path = dir.path().join("rrq.toml");
762        let config = format!(
763            r"
764        [rrq]
765        default_job_timeout_seconds = {}
766        default_lock_timeout_extension_seconds = 1
767        ",
768            i64::MAX
769        );
770        fs::write(&path, config).unwrap();
771        let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
772        assert!(
773            err.to_string()
774                .contains("provisional claim lock timeout overflow")
775        );
776    }
777}