1use std::path::Path;
2use std::time::Duration;
3
4use anyhow::{Context, Result};
5use serde_json::{Map, Value};
6
7use crate::queue::normalize_queue_name;
8use crate::settings::{RRQSettings, RunnerManagementMode};
9use crate::tcp_socket::parse_tcp_socket_with_allowed_hosts;
10
11pub const DEFAULT_CONFIG_FILENAME: &str = "rrq.toml";
12pub const ENV_CONFIG_KEY: &str = "RRQ_CONFIG";
13
14#[must_use]
15pub fn resolve_config_source(config_path: Option<&str>) -> (Option<String>, String) {
16 if let Some(path) = config_path {
17 return (Some(path.to_string()), "--config parameter".to_string());
18 }
19
20 if let Ok(env_path) = std::env::var(ENV_CONFIG_KEY)
21 && !env_path.is_empty()
22 {
23 return (Some(env_path), format!("{ENV_CONFIG_KEY} env var"));
24 }
25
26 let default_path = Path::new(DEFAULT_CONFIG_FILENAME);
27 if default_path.is_file() {
28 return (
29 Some(default_path.to_string_lossy().to_string()),
30 format!("{DEFAULT_CONFIG_FILENAME} in cwd"),
31 );
32 }
33
34 (None, "not found".to_string())
35}
36
37pub fn load_toml_settings(config_path: Option<&str>) -> Result<RRQSettings> {
38 load_toml_settings_with_runner_mode(config_path, None)
39}
40
41pub fn load_toml_settings_with_runner_mode(
42 config_path: Option<&str>,
43 runner_mode_override: Option<RunnerManagementMode>,
44) -> Result<RRQSettings> {
45 dotenvy::dotenv().ok();
46
47 let (path, _) = resolve_config_source(config_path);
48 let path = path.ok_or_else(|| {
49 anyhow::anyhow!("RRQ config not found. Provide --config, set RRQ_CONFIG, or add rrq.toml.")
50 })?;
51
52 let payload = std::fs::read_to_string(&path)
53 .with_context(|| format!("failed to read config at {path}"))?;
54 let toml_value: toml::Value =
55 toml::from_str(&payload).with_context(|| format!("failed to parse TOML at {path}"))?;
56 let mut json_value =
57 serde_json::to_value(toml_value).context("failed to convert TOML to JSON")?;
58
59 json_value = normalize_toml_payload(json_value)?;
60 let env_overrides = env_overrides();
61 let merged = deep_merge(json_value, env_overrides);
62
63 let mut settings: RRQSettings = serde_json::from_value(merged.clone()).map_err(|err| {
64 let hint = diagnose_config_error(&merged, &err);
65 anyhow::anyhow!("invalid RRQ config: {err}{hint}")
66 })?;
67 if let Some(mode) = runner_mode_override {
68 settings.runner_management_mode = mode;
69 }
70 validate_runner_configs(&settings)?;
71 Ok(settings)
72}
73
74fn normalize_toml_payload(mut payload: Value) -> Result<Value> {
75 if let Value::Object(mut map) = payload {
76 if let Some(rrq_value) = map.remove("rrq") {
77 payload = rrq_value;
78 } else {
79 payload = Value::Object(map);
80 }
81 }
82
83 if let Value::Object(mut map) = payload {
84 if let Some(routing) = map.remove("routing") {
85 map.insert("runner_routes".to_string(), routing);
86 }
87 map.remove("worker_concurrency");
88 normalize_queue_fields(&mut map);
89 return Ok(Value::Object(map));
90 }
91
92 Err(anyhow::anyhow!("RRQ config must be a TOML table"))
93}
94
95fn normalize_queue_fields(map: &mut Map<String, Value>) {
96 if let Some(Value::String(queue_name)) = map.get_mut("default_queue_name") {
97 *queue_name = normalize_queue_name(queue_name);
98 }
99
100 if let Some(Value::Object(routes)) = map.get_mut("runner_routes") {
101 let mut normalized = Map::new();
102 for (queue_name, runner_name) in std::mem::take(routes) {
103 normalized.insert(normalize_queue_name(&queue_name), runner_name);
104 }
105 *routes = normalized;
106 }
107}
108
109fn env_overrides() -> Value {
110 let mut payload = Map::new();
111
112 set_env_string(&mut payload, "redis_dsn", "RRQ_REDIS_DSN");
113 set_env_bool(
114 &mut payload,
115 "capture_runner_output",
116 "RRQ_CAPTURE_RUNNER_OUTPUT",
117 );
118
119 Value::Object(payload)
120}
121
122fn set_env_string(map: &mut Map<String, Value>, key: &str, env: &str) {
123 if let Ok(value) = std::env::var(env)
124 && !value.is_empty()
125 {
126 map.insert(key.to_string(), Value::String(value));
127 }
128}
129
130fn set_env_bool(map: &mut Map<String, Value>, key: &str, env: &str) {
131 if let Ok(value) = std::env::var(env)
132 && !value.is_empty()
133 {
134 let b = matches!(value.as_str(), "1" | "true" | "yes");
135 map.insert(key.to_string(), Value::Bool(b));
136 }
137}
138
139fn deep_merge(base: Value, overlay: Value) -> Value {
140 match (base, overlay) {
141 (Value::Object(mut base_map), Value::Object(overlay_map)) => {
142 for (key, value) in overlay_map {
143 let entry = base_map.remove(&key);
144 let merged = match entry {
145 Some(existing) => deep_merge(existing, value),
146 None => value,
147 };
148 base_map.insert(key, merged);
149 }
150 Value::Object(base_map)
151 }
152 (_, overlay_value) => overlay_value,
153 }
154}
155
156fn runner_tcp_host_or_default(raw: Option<&str>) -> String {
157 match raw.map(str::trim).filter(|s| !s.is_empty()) {
158 Some(value) => value.to_string(),
159 None => "127.0.0.1".to_string(),
160 }
161}
162
163fn runner_tcp_socket_for_validation(host: &str, port: u16) -> String {
164 if host.contains(':') && !host.starts_with('[') && !host.ends_with(']') {
165 format!("[{host}]:{port}")
166 } else {
167 format!("{host}:{port}")
168 }
169}
170
171fn diagnose_config_error(config: &Value, err: &serde_json::Error) -> String {
172 let err_msg = err.to_string().to_lowercase();
173
174 if err_msg.contains("unknown field")
176 && let Some(runners) = config.get("runners").and_then(|v| v.as_object())
177 {
178 let valid_fields = [
179 "type",
180 "cmd",
181 "pool_size",
182 "max_in_flight",
183 "env",
184 "cwd",
185 "tcp_host",
186 "tcp_port",
187 "allowed_hosts",
188 "response_timeout_seconds",
189 ];
190 for (name, runner) in runners {
191 if let Some(obj) = runner.as_object() {
192 for key in obj.keys() {
193 if !valid_fields.contains(&key.as_str()) {
194 return format!(
195 "\n\nHint: runner '{name}' has unknown field '{key}'. Valid fields are: {}",
196 valid_fields.join(", ")
197 );
198 }
199 }
200 }
201 }
202 }
203
204 if err_msg.contains("invalid type")
206 && let Some(runners) = config.get("runners").and_then(|v| v.as_object())
207 {
208 for (name, runner) in runners {
209 if let Some(obj) = runner.as_object() {
210 if let Some(pool_size) = obj.get("pool_size")
211 && !pool_size.is_u64()
212 {
213 return format!(
214 "\n\nHint: runner '{name}' has invalid pool_size - must be a positive integer, got: {pool_size}"
215 );
216 }
217 if let Some(max_in_flight) = obj.get("max_in_flight")
218 && !max_in_flight.is_u64()
219 {
220 return format!(
221 "\n\nHint: runner '{name}' has invalid max_in_flight - must be a positive integer, got: {max_in_flight}"
222 );
223 }
224 if let Some(cmd) = obj.get("cmd")
225 && !cmd.is_array()
226 {
227 return format!(
228 "\n\nHint: runner '{name}' has invalid cmd - must be an array of strings, got: {cmd}"
229 );
230 }
231 }
232 }
233 }
234
235 String::new()
236}
237
238fn validate_runner_configs(settings: &RRQSettings) -> Result<()> {
239 if settings.default_job_timeout_seconds <= 0 {
240 return Err(anyhow::anyhow!(
241 "default_job_timeout_seconds must be positive"
242 ));
243 }
244 if settings.default_lock_timeout_extension_seconds < 0 {
245 return Err(anyhow::anyhow!(
246 "default_lock_timeout_extension_seconds must be >= 0"
247 ));
248 }
249 settings
250 .default_job_timeout_seconds
251 .checked_add(settings.default_lock_timeout_extension_seconds)
252 .and_then(|sum| sum.checked_mul(1000))
253 .ok_or_else(|| anyhow::anyhow!("provisional claim lock timeout overflow"))?;
254
255 if !settings.runner_shutdown_term_grace_seconds.is_finite()
256 || settings.runner_shutdown_term_grace_seconds < 0.0
257 {
258 return Err(anyhow::anyhow!(
259 "runner_shutdown_term_grace_seconds must be a finite number >= 0"
260 ));
261 }
262 if settings.runner_shutdown_term_grace_seconds > Duration::MAX.as_secs_f64() {
263 return Err(anyhow::anyhow!(
264 "runner_shutdown_term_grace_seconds is too large (max {})",
265 Duration::MAX.as_secs_f64()
266 ));
267 }
268
269 if settings.runners.is_empty() {
270 return Ok(()); }
272
273 let default_pool_size = num_cpus::get();
274 let external_mode = settings.runner_management_mode == RunnerManagementMode::External;
275 for (name, config) in &settings.runners {
276 if let Some(pool_size) = config.pool_size
278 && pool_size == 0
279 {
280 return Err(anyhow::anyhow!(
281 "runner '{name}' has invalid pool_size: 0 - must be a positive integer"
282 ));
283 }
284
285 if let Some(max_in_flight) = config.max_in_flight
287 && max_in_flight == 0
288 {
289 return Err(anyhow::anyhow!(
290 "runner '{name}' has invalid max_in_flight: 0 - must be a positive integer"
291 ));
292 }
293
294 if config.tcp_port.is_none() {
296 return Err(anyhow::anyhow!(
297 "runner '{name}' is missing required field 'tcp_port' (e.g., 9000)"
298 ));
299 }
300
301 if !external_mode && config.cmd.is_none() {
302 return Err(anyhow::anyhow!(
303 "runner '{name}' is missing required field 'cmd' (e.g., [\"rrq-runner\", \"--settings\", \"myapp.settings\"])"
304 ));
305 }
306
307 if let Some(port) = config.tcp_port {
309 if port == 0 {
310 return Err(anyhow::anyhow!(
311 "runner '{name}' has invalid tcp_port: 0 - must be in 1..=65535"
312 ));
313 }
314
315 let host = runner_tcp_host_or_default(config.tcp_host.as_deref());
316 if external_mode {
317 let socket = runner_tcp_socket_for_validation(&host, port);
318 parse_tcp_socket_with_allowed_hosts(
319 &socket,
320 config.allowed_hosts.as_deref().unwrap_or(&[]),
321 )
322 .with_context(|| format!("runner '{name}' has invalid endpoint '{socket}'"))?;
323 }
324
325 let pool_size = if external_mode {
327 1
328 } else {
329 config.pool_size.unwrap_or(default_pool_size)
330 };
331 let port_span = u32::try_from(pool_size.saturating_sub(1)).map_err(|_| {
332 anyhow::anyhow!("runner '{name}' pool_size is too large: {pool_size}")
333 })?;
334 let max_port = u32::from(port) + port_span;
335 if max_port > u32::from(u16::MAX) {
336 return Err(anyhow::anyhow!(
337 "runner '{name}' tcp_port range insufficient for pool_size {pool_size} \
338 (port {} + {} would exceed 65535)",
339 port,
340 pool_size - 1
341 ));
342 }
343 }
344 }
345
346 Ok(())
347}
348
349#[cfg(test)]
350#[allow(unsafe_code)] mod tests {
352 use super::*;
353 use std::fs;
354 use std::sync::{Mutex, OnceLock};
355 use uuid::Uuid;
356
357 static ENV_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
358
359 fn env_lock() -> &'static Mutex<()> {
360 ENV_LOCK.get_or_init(|| Mutex::new(()))
361 }
362
363 #[test]
364 fn load_toml_settings_merges_env_and_normalizes_fields() {
365 let _lock = env_lock().lock().unwrap();
366 unsafe {
367 std::env::set_var("RRQ_REDIS_DSN", "redis://localhost:6379/10");
368 }
369
370 let dir = tempfile::tempdir().unwrap();
371 let path = dir.path().join("rrq.toml");
372 let config = r#"
373 [rrq]
374 redis_dsn = "redis://localhost:6379/9"
375 default_queue_name = "from_toml"
376 [rrq.routing]
377 alpha = "python"
378 "#;
379 fs::write(&path, config).unwrap();
380
381 let settings = load_toml_settings(Some(path.to_str().unwrap())).unwrap();
382 assert_eq!(settings.redis_dsn, "redis://localhost:6379/10");
383 assert_eq!(settings.default_queue_name, "rrq:queue:from_toml");
384 assert_eq!(
385 settings.runner_routes.get("rrq:queue:alpha"),
386 Some(&"python".to_string())
387 );
388 assert!(!settings.runner_routes.contains_key("alpha"));
389 }
390
391 #[test]
392 fn resolve_config_source_prefers_explicit_path() {
393 let (path, source) = resolve_config_source(Some("custom.toml"));
394 assert_eq!(path, Some("custom.toml".to_string()));
395 assert_eq!(source, "--config parameter");
396 }
397
398 #[test]
399 fn resolve_config_source_falls_back_to_env() {
400 let _lock = env_lock().lock().unwrap();
401 let value = format!("rrq-{}.toml", Uuid::new_v4());
402 unsafe {
403 std::env::set_var(ENV_CONFIG_KEY, &value);
404 }
405 let (path, source) = resolve_config_source(None);
406 assert_eq!(path, Some(value));
407 assert!(source.contains(ENV_CONFIG_KEY));
408 }
409
410 #[test]
411 fn validate_runner_configs_accepts_valid_config() {
412 let _lock = env_lock().lock().unwrap();
413 let dir = tempfile::tempdir().unwrap();
414 let path = dir.path().join("rrq.toml");
415 let config = r#"
416 [rrq]
417 default_runner_name = "python"
418 [rrq.runners.python]
419 cmd = ["rrq-runner", "--settings", "myapp.settings"]
420 tcp_port = 9000
421 pool_size = 2
422 max_in_flight = 10
423 "#;
424 fs::write(&path, config).unwrap();
425 let settings = load_toml_settings(Some(path.to_str().unwrap())).unwrap();
426 assert_eq!(settings.runners.len(), 1);
427 assert!(settings.runners.contains_key("python"));
428 }
429
430 #[test]
431 fn validate_runner_configs_rejects_missing_tcp_port() {
432 let _lock = env_lock().lock().unwrap();
433 let dir = tempfile::tempdir().unwrap();
434 let path = dir.path().join("rrq.toml");
435 let config = r#"
436 [rrq]
437 [rrq.runners.python]
438 cmd = ["rrq-runner"]
439 "#;
440 fs::write(&path, config).unwrap();
441 let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
442 assert!(err.to_string().contains("tcp_port"));
443 }
444
445 #[test]
446 fn validate_runner_configs_rejects_missing_cmd() {
447 let _lock = env_lock().lock().unwrap();
448 let dir = tempfile::tempdir().unwrap();
449 let path = dir.path().join("rrq.toml");
450 let config = r"
451 [rrq]
452 [rrq.runners.python]
453 tcp_port = 9000
454 ";
455 fs::write(&path, config).unwrap();
456 let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
457 assert!(err.to_string().contains("cmd"));
458 }
459
460 #[test]
461 fn validate_runner_configs_allows_missing_cmd_in_external_mode() {
462 let _lock = env_lock().lock().unwrap();
463 let dir = tempfile::tempdir().unwrap();
464 let path = dir.path().join("rrq.toml");
465 let config = r#"
466 [rrq]
467 runner_management_mode = "external"
468 [rrq.runners.python]
469 tcp_port = 9000
470 "#;
471 fs::write(&path, config).unwrap();
472 let settings = load_toml_settings(Some(path.to_str().unwrap())).unwrap();
473 assert_eq!(
474 settings.runner_management_mode,
475 RunnerManagementMode::External
476 );
477 assert!(settings.runners.contains_key("python"));
478 }
479
480 #[test]
481 fn load_toml_settings_with_runner_mode_allows_external_override_missing_cmd() {
482 let _lock = env_lock().lock().unwrap();
483 let dir = tempfile::tempdir().unwrap();
484 let path = dir.path().join("rrq.toml");
485 let config = r"
486 [rrq]
487 [rrq.runners.python]
488 tcp_port = 9000
489 ";
490 fs::write(&path, config).unwrap();
491 let settings = load_toml_settings_with_runner_mode(
492 Some(path.to_str().unwrap()),
493 Some(RunnerManagementMode::External),
494 )
495 .unwrap();
496 assert_eq!(
497 settings.runner_management_mode,
498 RunnerManagementMode::External
499 );
500 assert!(settings.runners.contains_key("python"));
501 }
502
503 #[test]
504 fn load_toml_settings_with_runner_mode_managed_override_requires_cmd() {
505 let _lock = env_lock().lock().unwrap();
506 let dir = tempfile::tempdir().unwrap();
507 let path = dir.path().join("rrq.toml");
508 let config = r#"
509 [rrq]
510 runner_management_mode = "external"
511 [rrq.runners.python]
512 tcp_port = 9000
513 "#;
514 fs::write(&path, config).unwrap();
515 let err = load_toml_settings_with_runner_mode(
516 Some(path.to_str().unwrap()),
517 Some(RunnerManagementMode::Managed),
518 )
519 .unwrap_err();
520 assert!(err.to_string().contains("cmd"));
521 }
522
523 #[test]
524 fn validate_runner_configs_rejects_non_loopback_in_external_without_allowlist() {
525 let _lock = env_lock().lock().unwrap();
526 let dir = tempfile::tempdir().unwrap();
527 let path = dir.path().join("rrq.toml");
528 let config = r#"
529 [rrq]
530 runner_management_mode = "external"
531 [rrq.runners.python]
532 tcp_host = "10.0.0.1"
533 tcp_port = 9000
534 "#;
535 fs::write(&path, config).unwrap();
536 let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
537 let full_err = format!("{err:?}");
539 assert!(full_err.contains("loopback"), "error was: {full_err}");
540 }
541
542 #[test]
543 fn validate_runner_configs_allows_non_loopback_when_allowlisted_in_external_mode() {
544 let _lock = env_lock().lock().unwrap();
545 let dir = tempfile::tempdir().unwrap();
546 let path = dir.path().join("rrq.toml");
547 let config = r#"
548 [rrq]
549 runner_management_mode = "external"
550 [rrq.runners.python]
551 tcp_host = "10.0.0.1"
552 tcp_port = 9000
553 allowed_hosts = ["10.0.0.1"]
554 "#;
555 fs::write(&path, config).unwrap();
556 let settings = load_toml_settings(Some(path.to_str().unwrap())).unwrap();
557 assert!(settings.runners.contains_key("python"));
558 }
559
560 #[test]
561 fn validate_runner_configs_allows_non_loopback_allowlist_in_managed_mode() {
562 let _lock = env_lock().lock().unwrap();
563 let dir = tempfile::tempdir().unwrap();
564 let path = dir.path().join("rrq.toml");
565 let config = r#"
566 [rrq]
567 [rrq.runners.python]
568 cmd = ["rrq-runner"]
569 tcp_host = "10.0.0.1"
570 tcp_port = 9000
571 allowed_hosts = ["10.0.0.1"]
572 "#;
573 fs::write(&path, config).unwrap();
574 let settings = load_toml_settings(Some(path.to_str().unwrap())).unwrap();
575 assert!(settings.runners.contains_key("python"));
576 }
577
578 #[test]
579 fn validate_runner_configs_rejects_zero_pool_size() {
580 let _lock = env_lock().lock().unwrap();
581 let dir = tempfile::tempdir().unwrap();
582 let path = dir.path().join("rrq.toml");
583 let config = r#"
584 [rrq]
585 [rrq.runners.python]
586 cmd = ["rrq-runner"]
587 tcp_port = 9000
588 pool_size = 0
589 "#;
590 fs::write(&path, config).unwrap();
591 let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
592 assert!(err.to_string().contains("pool_size"));
593 }
594
595 #[test]
596 fn validate_runner_configs_rejects_zero_max_in_flight() {
597 let _lock = env_lock().lock().unwrap();
598 let dir = tempfile::tempdir().unwrap();
599 let path = dir.path().join("rrq.toml");
600 let config = r#"
601 [rrq]
602 [rrq.runners.python]
603 cmd = ["rrq-runner"]
604 tcp_port = 9000
605 max_in_flight = 0
606 "#;
607 fs::write(&path, config).unwrap();
608 let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
609 assert!(err.to_string().contains("max_in_flight"));
610 }
611
612 #[test]
613 fn validate_runner_configs_allows_mismatched_default_runner() {
614 let _lock = env_lock().lock().unwrap();
615 let dir = tempfile::tempdir().unwrap();
616 let path = dir.path().join("rrq.toml");
617 let config = r#"
618 [rrq]
619 default_runner_name = "node"
620 [rrq.runners.python]
621 cmd = ["rrq-runner"]
622 tcp_port = 9000
623 "#;
624 fs::write(&path, config).unwrap();
625 let settings = load_toml_settings(Some(path.to_str().unwrap())).unwrap();
626 assert_eq!(settings.default_runner_name, "node");
627 assert!(settings.runners.contains_key("python"));
628 }
629
630 #[test]
631 fn validate_runner_configs_rejects_unknown_field() {
632 let _lock = env_lock().lock().unwrap();
633 let dir = tempfile::tempdir().unwrap();
634 let path = dir.path().join("rrq.toml");
635 let config = r#"
636 [rrq]
637 [rrq.runners.python]
638 cmd = ["rrq-runner"]
639 tcp_port = 9000
640 pool_siz = 4
641 "#;
642 fs::write(&path, config).unwrap();
643 let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
644 assert!(err.to_string().contains("unknown field"));
645 }
646
647 #[test]
648 fn validate_runner_configs_rejects_port_overflow() {
649 let _lock = env_lock().lock().unwrap();
650 let dir = tempfile::tempdir().unwrap();
651 let path = dir.path().join("rrq.toml");
652 let config = r#"
653 [rrq]
654 [rrq.runners.python]
655 cmd = ["rrq-runner"]
656 tcp_port = 65535
657 pool_size = 2
658 "#;
659 fs::write(&path, config).unwrap();
660 let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
661 assert!(err.to_string().contains("port range"));
662 }
663
664 #[test]
665 fn validate_runner_configs_uses_default_pool_size_for_port_range() {
666 let _lock = env_lock().lock().unwrap();
667 let dir = tempfile::tempdir().unwrap();
668 let path = dir.path().join("rrq.toml");
669 let config = r#"
670 [rrq]
671 [rrq.runners.python]
672 cmd = ["rrq-runner"]
673 tcp_port = 65535
674 "#;
675 fs::write(&path, config).unwrap();
676 let result = load_toml_settings(Some(path.to_str().unwrap()));
677 let default_pool_size = num_cpus::get();
678 if default_pool_size > 1 {
679 let err = result.unwrap_err();
680 assert!(err.to_string().contains("port range"));
681 } else {
682 assert!(result.is_ok());
683 }
684 }
685
686 #[test]
687 fn validate_runner_configs_rejects_negative_shutdown_term_grace() {
688 let _lock = env_lock().lock().unwrap();
689 let dir = tempfile::tempdir().unwrap();
690 let path = dir.path().join("rrq.toml");
691 let config = r#"
692 [rrq]
693 runner_shutdown_term_grace_seconds = -1
694 [rrq.runners.python]
695 cmd = ["rrq-runner"]
696 tcp_port = 9000
697 "#;
698 fs::write(&path, config).unwrap();
699 let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
700 assert!(
701 err.to_string()
702 .contains("runner_shutdown_term_grace_seconds")
703 );
704 }
705
706 #[test]
707 fn validate_runner_configs_rejects_oversized_shutdown_term_grace() {
708 let _lock = env_lock().lock().unwrap();
709 let dir = tempfile::tempdir().unwrap();
710 let path = dir.path().join("rrq.toml");
711 let config = r#"
712 [rrq]
713 runner_shutdown_term_grace_seconds = 1e100
714 [rrq.runners.python]
715 cmd = ["rrq-runner"]
716 tcp_port = 9000
717 "#;
718 fs::write(&path, config).unwrap();
719 let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
720 assert!(err.to_string().contains("too large"));
721 }
722
723 #[test]
724 fn validate_runner_configs_rejects_non_positive_default_job_timeout() {
725 let _lock = env_lock().lock().unwrap();
726 let dir = tempfile::tempdir().unwrap();
727 let path = dir.path().join("rrq.toml");
728 let config = r"
729 [rrq]
730 default_job_timeout_seconds = 0
731 ";
732 fs::write(&path, config).unwrap();
733 let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
734 assert!(
735 err.to_string()
736 .contains("default_job_timeout_seconds must be positive")
737 );
738 }
739
740 #[test]
741 fn validate_runner_configs_rejects_negative_default_lock_timeout_extension() {
742 let _lock = env_lock().lock().unwrap();
743 let dir = tempfile::tempdir().unwrap();
744 let path = dir.path().join("rrq.toml");
745 let config = r"
746 [rrq]
747 default_lock_timeout_extension_seconds = -1
748 ";
749 fs::write(&path, config).unwrap();
750 let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
751 assert!(
752 err.to_string()
753 .contains("default_lock_timeout_extension_seconds must be >= 0")
754 );
755 }
756
757 #[test]
758 fn validate_runner_configs_rejects_overflowing_provisional_claim_lock_timeout() {
759 let _lock = env_lock().lock().unwrap();
760 let dir = tempfile::tempdir().unwrap();
761 let path = dir.path().join("rrq.toml");
762 let config = format!(
763 r"
764 [rrq]
765 default_job_timeout_seconds = {}
766 default_lock_timeout_extension_seconds = 1
767 ",
768 i64::MAX
769 );
770 fs::write(&path, config).unwrap();
771 let err = load_toml_settings(Some(path.to_str().unwrap())).unwrap_err();
772 assert!(
773 err.to_string()
774 .contains("provisional claim lock timeout overflow")
775 );
776 }
777}