dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! File-based storage with checksums and atomic writes.

use crate::error::StorageError;
use async_trait::async_trait;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::io;
use std::path::{Path, PathBuf};

/// Result alias local to the storage layer.
pub type StorageResult<T> = std::result::Result<T, StorageError>;

/// A pluggable key/value store for JSON values.
///
/// The executor depends only on this trait, so persistence can be swapped
/// (file, in-memory, or a future remote backend) without touching the engine.
#[async_trait]
pub trait Storage: Send + Sync {
    /// Persist `value` under `key`, overwriting any existing entry.
    async fn save(&self, key: &str, value: &serde_json::Value) -> StorageResult<()>;
    /// Load the value stored under `key`, or `None` if absent.
    async fn load(&self, key: &str) -> StorageResult<Option<serde_json::Value>>;
    /// Remove `key`. Removing a missing key is a no-op (not an error).
    async fn delete(&self, key: &str) -> StorageResult<()>;
    /// List all keys currently present.
    async fn list(&self) -> StorageResult<Vec<String>>;
}

/// On-disk envelope: the original key, a checksum, and the data.
///
/// The key is stored so [`FileStorage::list`] can recover it, since the
/// filename is a hash and not human-readable.
#[derive(Serialize, Deserialize)]
struct Envelope {
    key: String,
    checksum: String,
    data: serde_json::Value,
}

fn checksum(value: &serde_json::Value) -> StorageResult<String> {
    // Hash the canonical serialization so the same logical value always hashes
    // identically regardless of in-memory map ordering.
    let bytes = serde_json::to_vec(value)?;
    let mut hasher = Sha256::new();
    hasher.update(&bytes);
    Ok(format!("{:x}", hasher.finalize()))
}

/// Map an arbitrary key to a filesystem-safe filename.
///
/// The filename is the hex SHA-256 of the key, which is guaranteed to be a
/// valid filename on every platform and makes path traversal impossible by
/// construction (the key's contents never reach the path). The only rejected
/// keys are empty or absurdly long ones.
fn safe_filename(key: &str) -> StorageResult<String> {
    if key.is_empty() || key.len() > 1024 {
        return Err(StorageError::InvalidKey(key.to_string()));
    }
    let mut hasher = Sha256::new();
    hasher.update(key.as_bytes());
    Ok(format!("{:x}.json", hasher.finalize()))
}

/// Best-effort `fsync` of a file's parent directory, so a rename into it is
/// durable across power loss. Failures are ignored (the data file is already
/// fsync'd); this is a belt-and-suspenders step.
#[cfg(unix)]
fn sync_parent_dir(path: &Path) {
    if let Some(parent) = path.parent() {
        if let Ok(dir) = std::fs::File::open(parent) {
            let _ = dir.sync_all();
        }
    }
}

/// On non-Unix platforms a directory cannot be opened and `fsync`'d through the
/// std API; rely on the filesystem's own metadata journaling (e.g. NTFS).
#[cfg(not(unix))]
fn sync_parent_dir(_path: &Path) {}

/// Write-durability strategy for [`FileStorage`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Durability {
    /// Overwrite the target file in place (open + truncate + write).
    ///
    /// Fastest: after a key's file exists, each save reuses the same inode/MFT
    /// entry with no file creation or rename. A crash mid-write leaves a record
    /// whose stored checksum no longer matches its data; such a record is
    /// detected on load and treated as absent, so the affected task simply
    /// re-runs (which fits the executor's re-run-incomplete-work model).
    Fast,
    /// Write to a unique temp file, then atomically `rename` it over the target.
    ///
    /// A crash never destroys the previous good record (you always read either
    /// the old or the new value), at the cost of a file create + rename per
    /// save. Bytes may still be sitting in the OS page cache, so this protects
    /// against process crashes but not necessarily power loss.
    Atomic,
    /// Like [`Durability::Atomic`], but `fsync`s the data to physical storage
    /// before the rename and best-effort `fsync`s the directory afterward.
    ///
    /// The strongest guarantee — survives power loss / kernel panic — and the
    /// slowest (each save waits on disk flush). The directory sync is a no-op on
    /// platforms where it is not available (e.g. Windows, where NTFS journaling
    /// makes the rename durable in practice).
    Durable,
}

/// JSON storage backed by one file per key, with SHA-256 checksums.
///
/// Each file carries a checksum that is verified on load, and filenames are
/// hashes of the key so arbitrary keys are safe and path traversal is
/// impossible. The [`Durability`] mode controls the write strategy; the default
/// ([`FileStorage::open`]) is [`Durability::Fast`].
pub struct FileStorage {
    root: PathBuf,
    durability: Durability,
}

impl FileStorage {
    /// Open (creating if necessary) a fast, in-place storage rooted at `dir`.
    pub fn open(dir: impl AsRef<Path>) -> StorageResult<Self> {
        Self::open_with(dir, Durability::Fast)
    }

    /// Open with an explicit [`Durability`] mode.
    pub fn open_with(dir: impl AsRef<Path>, durability: Durability) -> StorageResult<Self> {
        let root = dir.as_ref().to_path_buf();
        std::fs::create_dir_all(&root)?;
        Ok(FileStorage { root, durability })
    }

    /// The write-durability mode in effect.
    pub fn durability(&self) -> Durability {
        self.durability
    }

    fn path_for(&self, key: &str) -> StorageResult<PathBuf> {
        Ok(self.root.join(safe_filename(key)?))
    }
}

#[async_trait]
impl Storage for FileStorage {
    async fn save(&self, key: &str, value: &serde_json::Value) -> StorageResult<()> {
        let path = self.path_for(key)?;
        let envelope = Envelope {
            key: key.to_string(),
            checksum: checksum(value)?,
            data: value.clone(),
        };
        let bytes = serde_json::to_vec(&envelope)?;
        let root = self.root.clone();
        let durability = self.durability;

        // Filesystem work is blocking; push it to the blocking pool.
        tokio::task::spawn_blocking(move || -> StorageResult<()> {
            match durability {
                Durability::Fast => {
                    use std::io::Write;
                    // Reuse the existing file when present: no create, no rename.
                    let mut f = std::fs::OpenOptions::new()
                        .create(true)
                        .write(true)
                        .truncate(true)
                        .open(&path)?;
                    f.write_all(&bytes)?;
                    Ok(())
                }
                Durability::Atomic => {
                    let tmp = root.join(format!(".tmp-{}", uuid::Uuid::new_v4()));
                    std::fs::write(&tmp, &bytes)?;
                    // rename is atomic on the same filesystem.
                    std::fs::rename(&tmp, &path)?;
                    Ok(())
                }
                Durability::Durable => {
                    use std::io::Write;
                    let tmp = root.join(format!(".tmp-{}", uuid::Uuid::new_v4()));
                    {
                        let mut f = std::fs::OpenOptions::new()
                            .create(true)
                            .write(true)
                            .truncate(true)
                            .open(&tmp)?;
                        f.write_all(&bytes)?;
                        // Force the data to physical storage before we rename, so
                        // the renamed-in file is never a cached-but-not-persisted
                        // ghost after power loss.
                        f.sync_all()?;
                    }
                    std::fs::rename(&tmp, &path)?;
                    // Persist the rename itself by syncing the directory entry.
                    sync_parent_dir(&path);
                    Ok(())
                }
            }
        })
        .await
        .map_err(|e| StorageError::Io(io::Error::other(e)))?
    }

    async fn load(&self, key: &str) -> StorageResult<Option<serde_json::Value>> {
        let path = self.path_for(key)?;
        let bytes = match tokio::fs::read(&path).await {
            Ok(b) => b,
            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
            Err(e) => return Err(e.into()),
        };
        let envelope: Envelope = serde_json::from_slice(&bytes)?;
        if checksum(&envelope.data)? != envelope.checksum {
            return Err(StorageError::ChecksumMismatch(key.to_string()));
        }
        Ok(Some(envelope.data))
    }

    async fn delete(&self, key: &str) -> StorageResult<()> {
        let path = self.path_for(key)?;
        match tokio::fs::remove_file(&path).await {
            Ok(()) => Ok(()),
            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
            Err(e) => Err(e.into()),
        }
    }

    async fn list(&self) -> StorageResult<Vec<String>> {
        let mut keys = Vec::new();
        let mut dir = tokio::fs::read_dir(&self.root).await?;
        while let Some(entry) = dir.next_entry().await? {
            let name = entry.file_name();
            let name = name.to_string_lossy();
            // Filenames are hashes, so the real key lives inside each envelope.
            if name.ends_with(".json") && !name.starts_with(".tmp-") {
                if let Ok(bytes) = tokio::fs::read(entry.path()).await {
                    if let Ok(envelope) = serde_json::from_slice::<Envelope>(&bytes) {
                        keys.push(envelope.key);
                    }
                }
            }
        }
        Ok(keys)
    }
}

/// Thread-safe in-memory storage. Useful for tests and ephemeral runs.
#[derive(Default)]
pub struct MemoryStorage {
    map: Mutex<HashMap<String, serde_json::Value>>,
}

impl MemoryStorage {
    /// Create an empty in-memory store.
    pub fn new() -> Self {
        Self::default()
    }
}

#[async_trait]
impl Storage for MemoryStorage {
    async fn save(&self, key: &str, value: &serde_json::Value) -> StorageResult<()> {
        self.map.lock().insert(key.to_string(), value.clone());
        Ok(())
    }

    async fn load(&self, key: &str) -> StorageResult<Option<serde_json::Value>> {
        Ok(self.map.lock().get(key).cloned())
    }

    async fn delete(&self, key: &str) -> StorageResult<()> {
        self.map.lock().remove(key);
        Ok(())
    }

    async fn list(&self) -> StorageResult<Vec<String>> {
        Ok(self.map.lock().keys().cloned().collect())
    }
}