dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! Runtime configuration.

use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::time::Duration;

/// Top-level executor configuration.
///
/// All fields have sensible defaults (see [`Config::default`]) so the executor
/// can run with zero configuration. Values can also be loaded from a JSON file
/// or overridden from the environment.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct Config {
    /// Maximum number of tasks executing concurrently.
    pub max_concurrency: usize,
    /// Directory used by file-based storage.
    pub storage_dir: PathBuf,
    /// Whether to persist task state to disk during execution.
    pub persist_state: bool,
    /// LRU cache capacity (entries) sitting in front of storage.
    pub cache_capacity: usize,
    /// Default per-task timeout. `None` disables the timeout.
    #[serde(with = "humantime_opt")]
    pub task_timeout: Option<Duration>,
    /// Default maximum execution attempts per task (1 = no retries).
    pub max_attempts: u32,
    /// Whether the metrics endpoint/registry is enabled.
    pub metrics_enabled: bool,
    /// Log level filter (e.g. "info", "debug", "dag_executor=trace").
    pub log_level: String,
}

impl Default for Config {
    fn default() -> Self {
        Config {
            max_concurrency: 1024,
            storage_dir: PathBuf::from(".dag-executor"),
            persist_state: true,
            cache_capacity: 4096,
            task_timeout: Some(Duration::from_secs(300)),
            max_attempts: 3,
            metrics_enabled: cfg!(feature = "metrics"),
            log_level: "info".to_string(),
        }
    }
}

impl Config {
    /// Load configuration from a JSON file, falling back to defaults for any
    /// missing fields.
    pub fn from_file(path: impl Into<PathBuf>) -> anyhow::Result<Self> {
        let path = path.into();
        let raw = std::fs::read_to_string(&path)
            .map_err(|e| anyhow::anyhow!("reading config {}: {e}", path.display()))?;
        let cfg: Config = serde_json::from_str(&raw)?;
        Ok(cfg)
    }

    /// Apply overrides from environment variables.
    ///
    /// Recognized: `DAG_MAX_CONCURRENCY`, `DAG_STORAGE_DIR`, `DAG_MAX_ATTEMPTS`,
    /// `DAG_LOG_LEVEL`, `DAG_PERSIST_STATE`.
    pub fn with_env_overrides(mut self) -> Self {
        if let Ok(v) = std::env::var("DAG_MAX_CONCURRENCY") {
            if let Ok(n) = v.parse() {
                self.max_concurrency = n;
            }
        }
        if let Ok(v) = std::env::var("DAG_STORAGE_DIR") {
            self.storage_dir = PathBuf::from(v);
        }
        if let Ok(v) = std::env::var("DAG_MAX_ATTEMPTS") {
            if let Ok(n) = v.parse() {
                self.max_attempts = n;
            }
        }
        if let Ok(v) = std::env::var("DAG_LOG_LEVEL") {
            self.log_level = v;
        }
        if let Ok(v) = std::env::var("DAG_PERSIST_STATE") {
            self.persist_state = matches!(v.as_str(), "1" | "true" | "yes");
        }
        self
    }
}

/// Serde helper for `Option<Duration>` expressed as whole seconds.
mod humantime_opt {
    use serde::{Deserialize, Deserializer, Serialize, Serializer};
    use std::time::Duration;

    pub fn serialize<S: Serializer>(value: &Option<Duration>, ser: S) -> Result<S::Ok, S::Error> {
        value.map(|d| d.as_secs()).serialize(ser)
    }

    pub fn deserialize<'de, D: Deserializer<'de>>(de: D) -> Result<Option<Duration>, D::Error> {
        let secs = Option::<u64>::deserialize(de)?;
        Ok(secs.map(Duration::from_secs))
    }
}