runledger-runtime 0.1.1

Async worker, scheduler, and reaper runtime for the Runledger job system
Documentation
use std::str::FromStr;
use std::time::Duration;

const DEFAULT_POLL_INTERVAL_MS: u64 = 500;
const DEFAULT_CLAIM_BATCH_SIZE: i64 = 16;
const DEFAULT_LEASE_TTL_SECONDS: i32 = 60;
const DEFAULT_MAX_GLOBAL_CONCURRENCY: usize = 32;
const DEFAULT_REAPER_INTERVAL_SECONDS: u64 = 15;
const DEFAULT_SCHEDULE_POLL_INTERVAL_SECONDS: u64 = 30;
const DEFAULT_REAPER_RETRY_DELAY_MS: i32 = 30_000;

#[derive(Debug, Clone)]
pub struct JobsConfig {
    pub worker_id: String,
    pub poll_interval: Duration,
    pub claim_batch_size: i64,
    pub lease_ttl_seconds: i32,
    pub max_global_concurrency: usize,
    pub reaper_interval: Duration,
    pub schedule_poll_interval: Duration,
    pub reaper_retry_delay_ms: i32,
}

impl JobsConfig {
    #[must_use]
    pub fn from_env() -> Self {
        Self {
            worker_id: std::env::var("JOBS_WORKER_ID")
                .ok()
                .filter(|value| !value.trim().is_empty())
                .unwrap_or_else(|| format!("worker-{}", uuid::Uuid::now_v7())),
            poll_interval: Duration::from_millis(
                parse_env("JOBS_POLL_INTERVAL_MS", DEFAULT_POLL_INTERVAL_MS).max(1),
            ),
            claim_batch_size: parse_env("JOBS_CLAIM_BATCH_SIZE", DEFAULT_CLAIM_BATCH_SIZE).max(1),
            lease_ttl_seconds: parse_env("JOBS_LEASE_TTL_SECONDS", DEFAULT_LEASE_TTL_SECONDS)
                .max(10),
            max_global_concurrency: parse_env(
                "JOBS_MAX_GLOBAL_CONCURRENCY",
                DEFAULT_MAX_GLOBAL_CONCURRENCY,
            )
            .max(1),
            reaper_interval: Duration::from_secs(
                parse_env(
                    "JOBS_REAPER_INTERVAL_SECONDS",
                    DEFAULT_REAPER_INTERVAL_SECONDS,
                )
                .max(1),
            ),
            schedule_poll_interval: Duration::from_secs(
                parse_env(
                    "JOBS_SCHEDULE_POLL_INTERVAL_SECONDS",
                    DEFAULT_SCHEDULE_POLL_INTERVAL_SECONDS,
                )
                .max(1),
            ),
            reaper_retry_delay_ms: parse_env(
                "JOBS_REAPER_RETRY_DELAY_MS",
                DEFAULT_REAPER_RETRY_DELAY_MS,
            )
            .max(1_000),
        }
    }
}

fn parse_env<T>(name: &str, default: T) -> T
where
    T: FromStr,
{
    std::env::var(name)
        .ok()
        .and_then(|value| value.parse::<T>().ok())
        .unwrap_or(default)
}

#[cfg(test)]
mod tests {
    use std::sync::{Mutex, OnceLock};

    use super::*;

    static ENV_LOCK: OnceLock<Mutex<()>> = OnceLock::new();

    #[derive(Debug)]
    struct ScopedEnv {
        _guard: std::sync::MutexGuard<'static, ()>,
        prior: Vec<(String, Option<String>)>,
    }

    impl ScopedEnv {
        fn set(overrides: &[(&str, Option<&str>)]) -> Self {
            let guard = ENV_LOCK
                .get_or_init(|| Mutex::new(()))
                .lock()
                .unwrap_or_else(std::sync::PoisonError::into_inner);

            let prior = overrides
                .iter()
                .map(|(key, _)| (key.to_string(), std::env::var(key).ok()))
                .collect();

            // SAFETY: env mutation is serialized through ENV_LOCK.
            unsafe {
                for (key, value) in overrides {
                    match value {
                        Some(value) => std::env::set_var(key, value),
                        None => std::env::remove_var(key),
                    }
                }
            }

            Self {
                _guard: guard,
                prior,
            }
        }
    }

    impl Drop for ScopedEnv {
        fn drop(&mut self) {
            // SAFETY: env mutation is serialized through ENV_LOCK.
            unsafe {
                for (key, value) in self.prior.drain(..) {
                    match value {
                        Some(value) => std::env::set_var(&key, value),
                        None => std::env::remove_var(&key),
                    }
                }
            }
        }
    }

    #[test]
    fn from_env_clamps_zero_intervals_to_non_zero_minimum() {
        let _env = ScopedEnv::set(&[
            ("JOBS_POLL_INTERVAL_MS", Some("0")),
            ("JOBS_REAPER_INTERVAL_SECONDS", Some("0")),
            ("JOBS_SCHEDULE_POLL_INTERVAL_SECONDS", Some("0")),
        ]);

        let config = JobsConfig::from_env();
        assert_eq!(config.poll_interval, Duration::from_millis(1));
        assert_eq!(config.reaper_interval, Duration::from_secs(1));
        assert_eq!(config.schedule_poll_interval, Duration::from_secs(1));
    }

    #[test]
    fn from_env_clamps_non_interval_limits_and_falls_back_worker_id() {
        let _env = ScopedEnv::set(&[
            ("JOBS_CLAIM_BATCH_SIZE", Some("0")),
            ("JOBS_LEASE_TTL_SECONDS", Some("1")),
            ("JOBS_MAX_GLOBAL_CONCURRENCY", Some("0")),
            ("JOBS_REAPER_RETRY_DELAY_MS", Some("1")),
            ("JOBS_WORKER_ID", Some("   ")),
        ]);

        let config = JobsConfig::from_env();
        assert_eq!(config.claim_batch_size, 1);
        assert_eq!(config.lease_ttl_seconds, 10);
        assert_eq!(config.max_global_concurrency, 1);
        assert_eq!(config.reaper_retry_delay_ms, 1_000);
        assert!(config.worker_id.starts_with("worker-"));
    }
}