Skip to main content

dag_executor/utils/
config.rs

1//! Runtime configuration.
2
3use serde::{Deserialize, Serialize};
4use std::path::PathBuf;
5use std::time::Duration;
6
7/// Top-level executor configuration.
8///
9/// All fields have sensible defaults (see [`Config::default`]) so the executor
10/// can run with zero configuration. Values can also be loaded from a JSON file
11/// or overridden from the environment.
12#[derive(Debug, Clone, Serialize, Deserialize)]
13#[serde(default)]
14pub struct Config {
15    /// Maximum number of tasks executing concurrently.
16    pub max_concurrency: usize,
17    /// Directory used by file-based storage.
18    pub storage_dir: PathBuf,
19    /// Whether to persist task state to disk during execution.
20    pub persist_state: bool,
21    /// LRU cache capacity (entries) sitting in front of storage.
22    pub cache_capacity: usize,
23    /// Default per-task timeout. `None` disables the timeout.
24    #[serde(with = "humantime_opt")]
25    pub task_timeout: Option<Duration>,
26    /// Default maximum execution attempts per task (1 = no retries).
27    pub max_attempts: u32,
28    /// Whether the metrics endpoint/registry is enabled.
29    pub metrics_enabled: bool,
30    /// Log level filter (e.g. "info", "debug", "dag_executor=trace").
31    pub log_level: String,
32}
33
34impl Default for Config {
35    fn default() -> Self {
36        Config {
37            max_concurrency: 1024,
38            storage_dir: PathBuf::from(".dag-executor"),
39            persist_state: true,
40            cache_capacity: 4096,
41            task_timeout: Some(Duration::from_secs(300)),
42            max_attempts: 3,
43            metrics_enabled: cfg!(feature = "metrics"),
44            log_level: "info".to_string(),
45        }
46    }
47}
48
49impl Config {
50    /// Load configuration from a JSON file, falling back to defaults for any
51    /// missing fields.
52    pub fn from_file(path: impl Into<PathBuf>) -> anyhow::Result<Self> {
53        let path = path.into();
54        let raw = std::fs::read_to_string(&path)
55            .map_err(|e| anyhow::anyhow!("reading config {}: {e}", path.display()))?;
56        let cfg: Config = serde_json::from_str(&raw)?;
57        Ok(cfg)
58    }
59
60    /// Apply overrides from environment variables.
61    ///
62    /// Recognized: `DAG_MAX_CONCURRENCY`, `DAG_STORAGE_DIR`, `DAG_MAX_ATTEMPTS`,
63    /// `DAG_LOG_LEVEL`, `DAG_PERSIST_STATE`.
64    pub fn with_env_overrides(mut self) -> Self {
65        if let Ok(v) = std::env::var("DAG_MAX_CONCURRENCY") {
66            if let Ok(n) = v.parse() {
67                self.max_concurrency = n;
68            }
69        }
70        if let Ok(v) = std::env::var("DAG_STORAGE_DIR") {
71            self.storage_dir = PathBuf::from(v);
72        }
73        if let Ok(v) = std::env::var("DAG_MAX_ATTEMPTS") {
74            if let Ok(n) = v.parse() {
75                self.max_attempts = n;
76            }
77        }
78        if let Ok(v) = std::env::var("DAG_LOG_LEVEL") {
79            self.log_level = v;
80        }
81        if let Ok(v) = std::env::var("DAG_PERSIST_STATE") {
82            self.persist_state = matches!(v.as_str(), "1" | "true" | "yes");
83        }
84        self
85    }
86}
87
88/// Serde helper for `Option<Duration>` expressed as whole seconds.
89mod humantime_opt {
90    use serde::{Deserialize, Deserializer, Serialize, Serializer};
91    use std::time::Duration;
92
93    pub fn serialize<S: Serializer>(value: &Option<Duration>, ser: S) -> Result<S::Ok, S::Error> {
94        value.map(|d| d.as_secs()).serialize(ser)
95    }
96
97    pub fn deserialize<'de, D: Deserializer<'de>>(de: D) -> Result<Option<Duration>, D::Error> {
98        let secs = Option::<u64>::deserialize(de)?;
99        Ok(secs.map(Duration::from_secs))
100    }
101}