Skip to main content

runledger_runtime/
config.rs

1use std::str::FromStr;
2use std::time::Duration;
3
4const DEFAULT_POLL_INTERVAL_MS: u64 = 500;
5const DEFAULT_CLAIM_BATCH_SIZE: i64 = 16;
6const DEFAULT_LEASE_TTL_SECONDS: i32 = 60;
7const DEFAULT_MAX_GLOBAL_CONCURRENCY: usize = 32;
8const DEFAULT_REAPER_INTERVAL_SECONDS: u64 = 15;
9const DEFAULT_SCHEDULE_POLL_INTERVAL_SECONDS: u64 = 30;
10const DEFAULT_REAPER_RETRY_DELAY_MS: i32 = 30_000;
11
12#[derive(Debug, Clone)]
13pub struct JobsConfig {
14    pub worker_id: String,
15    pub poll_interval: Duration,
16    pub claim_batch_size: i64,
17    pub lease_ttl_seconds: i32,
18    pub max_global_concurrency: usize,
19    pub reaper_interval: Duration,
20    pub schedule_poll_interval: Duration,
21    pub reaper_retry_delay_ms: i32,
22}
23
24impl JobsConfig {
25    #[must_use]
26    pub fn from_env() -> Self {
27        Self {
28            worker_id: std::env::var("JOBS_WORKER_ID")
29                .ok()
30                .filter(|value| !value.trim().is_empty())
31                .unwrap_or_else(|| format!("worker-{}", uuid::Uuid::now_v7())),
32            poll_interval: Duration::from_millis(
33                parse_env("JOBS_POLL_INTERVAL_MS", DEFAULT_POLL_INTERVAL_MS).max(1),
34            ),
35            claim_batch_size: parse_env("JOBS_CLAIM_BATCH_SIZE", DEFAULT_CLAIM_BATCH_SIZE).max(1),
36            lease_ttl_seconds: parse_env("JOBS_LEASE_TTL_SECONDS", DEFAULT_LEASE_TTL_SECONDS)
37                .max(10),
38            max_global_concurrency: parse_env(
39                "JOBS_MAX_GLOBAL_CONCURRENCY",
40                DEFAULT_MAX_GLOBAL_CONCURRENCY,
41            )
42            .max(1),
43            reaper_interval: Duration::from_secs(
44                parse_env(
45                    "JOBS_REAPER_INTERVAL_SECONDS",
46                    DEFAULT_REAPER_INTERVAL_SECONDS,
47                )
48                .max(1),
49            ),
50            schedule_poll_interval: Duration::from_secs(
51                parse_env(
52                    "JOBS_SCHEDULE_POLL_INTERVAL_SECONDS",
53                    DEFAULT_SCHEDULE_POLL_INTERVAL_SECONDS,
54                )
55                .max(1),
56            ),
57            reaper_retry_delay_ms: parse_env(
58                "JOBS_REAPER_RETRY_DELAY_MS",
59                DEFAULT_REAPER_RETRY_DELAY_MS,
60            )
61            .max(1_000),
62        }
63    }
64}
65
66fn parse_env<T>(name: &str, default: T) -> T
67where
68    T: FromStr,
69{
70    std::env::var(name)
71        .ok()
72        .and_then(|value| value.parse::<T>().ok())
73        .unwrap_or(default)
74}
75
76#[cfg(test)]
77mod tests {
78    use std::sync::{Mutex, OnceLock};
79
80    use super::*;
81
82    static ENV_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
83
84    #[derive(Debug)]
85    struct ScopedEnv {
86        _guard: std::sync::MutexGuard<'static, ()>,
87        prior: Vec<(String, Option<String>)>,
88    }
89
90    impl ScopedEnv {
91        fn set(overrides: &[(&str, Option<&str>)]) -> Self {
92            let guard = ENV_LOCK
93                .get_or_init(|| Mutex::new(()))
94                .lock()
95                .unwrap_or_else(std::sync::PoisonError::into_inner);
96
97            let prior = overrides
98                .iter()
99                .map(|(key, _)| (key.to_string(), std::env::var(key).ok()))
100                .collect();
101
102            // SAFETY: env mutation is serialized through ENV_LOCK.
103            unsafe {
104                for (key, value) in overrides {
105                    match value {
106                        Some(value) => std::env::set_var(key, value),
107                        None => std::env::remove_var(key),
108                    }
109                }
110            }
111
112            Self {
113                _guard: guard,
114                prior,
115            }
116        }
117    }
118
119    impl Drop for ScopedEnv {
120        fn drop(&mut self) {
121            // SAFETY: env mutation is serialized through ENV_LOCK.
122            unsafe {
123                for (key, value) in self.prior.drain(..) {
124                    match value {
125                        Some(value) => std::env::set_var(&key, value),
126                        None => std::env::remove_var(&key),
127                    }
128                }
129            }
130        }
131    }
132
133    #[test]
134    fn from_env_clamps_zero_intervals_to_non_zero_minimum() {
135        let _env = ScopedEnv::set(&[
136            ("JOBS_POLL_INTERVAL_MS", Some("0")),
137            ("JOBS_REAPER_INTERVAL_SECONDS", Some("0")),
138            ("JOBS_SCHEDULE_POLL_INTERVAL_SECONDS", Some("0")),
139        ]);
140
141        let config = JobsConfig::from_env();
142        assert_eq!(config.poll_interval, Duration::from_millis(1));
143        assert_eq!(config.reaper_interval, Duration::from_secs(1));
144        assert_eq!(config.schedule_poll_interval, Duration::from_secs(1));
145    }
146
147    #[test]
148    fn from_env_clamps_non_interval_limits_and_falls_back_worker_id() {
149        let _env = ScopedEnv::set(&[
150            ("JOBS_CLAIM_BATCH_SIZE", Some("0")),
151            ("JOBS_LEASE_TTL_SECONDS", Some("1")),
152            ("JOBS_MAX_GLOBAL_CONCURRENCY", Some("0")),
153            ("JOBS_REAPER_RETRY_DELAY_MS", Some("1")),
154            ("JOBS_WORKER_ID", Some("   ")),
155        ]);
156
157        let config = JobsConfig::from_env();
158        assert_eq!(config.claim_batch_size, 1);
159        assert_eq!(config.lease_ttl_seconds, 10);
160        assert_eq!(config.max_global_concurrency, 1);
161        assert_eq!(config.reaper_retry_delay_ms, 1_000);
162        assert!(config.worker_id.starts_with("worker-"));
163    }
164}