dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! Unit tests for storage backends and the LRU cache.

use std::sync::Arc;

use dag_executor::error::StorageError;
use dag_executor::storage::{Cache, Durability, FileStorage, MemoryStorage, Storage};

#[tokio::test]
async fn file_storage_round_trips() {
    let dir = tempfile::tempdir().unwrap();
    let s = FileStorage::open(dir.path()).unwrap();
    assert_eq!(s.durability(), Durability::Fast); // fast is the default
    let v = serde_json::json!({ "n": 42 });
    s.save("k", &v).await.unwrap();
    assert_eq!(s.load("k").await.unwrap(), Some(v));
    assert_eq!(s.list().await.unwrap(), vec!["k".to_string()]);
    s.delete("k").await.unwrap();
    assert_eq!(s.load("k").await.unwrap(), None);
}

#[tokio::test]
async fn file_storage_atomic_mode_round_trips_and_overwrites() {
    let dir = tempfile::tempdir().unwrap();
    let s = FileStorage::open_with(dir.path(), Durability::Atomic).unwrap();
    s.save("k", &serde_json::json!(1)).await.unwrap();
    s.save("k", &serde_json::json!(2)).await.unwrap(); // overwrite
    assert_eq!(s.load("k").await.unwrap(), Some(serde_json::json!(2)));
    // The temp file used for the atomic rename leaves no residue.
    assert_eq!(s.list().await.unwrap(), vec!["k".to_string()]);
}

#[tokio::test]
async fn file_storage_durable_mode_round_trips() {
    let dir = tempfile::tempdir().unwrap();
    let s = FileStorage::open_with(dir.path(), Durability::Durable).unwrap();
    s.save("k", &serde_json::json!({ "v": 1 })).await.unwrap();
    s.save("k", &serde_json::json!({ "v": 2 })).await.unwrap();
    assert_eq!(
        s.load("k").await.unwrap(),
        Some(serde_json::json!({ "v": 2 }))
    );
    // fsync'd temp is renamed away, leaving exactly the one record file.
    assert_eq!(s.list().await.unwrap(), vec!["k".to_string()]);
}

#[tokio::test]
async fn fast_mode_reuses_the_same_file_across_saves() {
    let dir = tempfile::tempdir().unwrap();
    let s = FileStorage::open(dir.path()).unwrap();
    for i in 0..5 {
        s.save("k", &serde_json::json!(i)).await.unwrap();
    }
    assert_eq!(s.load("k").await.unwrap(), Some(serde_json::json!(4)));
    // Exactly one file on disk — no temp/rename churn.
    let files = std::fs::read_dir(dir.path()).unwrap().count();
    assert_eq!(files, 1);
}

#[tokio::test]
async fn file_storage_detects_corruption() {
    let dir = tempfile::tempdir().unwrap();
    let s = FileStorage::open(dir.path()).unwrap();
    s.save("k", &serde_json::json!("data")).await.unwrap();

    // Corrupt the single on-disk file (its name is a hash) so the stored
    // checksum no longer matches the data.
    let path = std::fs::read_dir(dir.path())
        .unwrap()
        .map(|e| e.unwrap().path())
        .find(|p| p.extension().map(|x| x == "json").unwrap_or(false))
        .expect("a stored file");
    let mut envelope: serde_json::Value =
        serde_json::from_slice(&std::fs::read(&path).unwrap()).unwrap();
    envelope["data"] = serde_json::json!("tampered");
    std::fs::write(&path, serde_json::to_vec(&envelope).unwrap()).unwrap();

    let err = s.load("k").await.unwrap_err();
    assert!(matches!(err, StorageError::ChecksumMismatch(_)));
}

#[tokio::test]
async fn file_storage_rejects_empty_keys() {
    let dir = tempfile::tempdir().unwrap();
    let s = FileStorage::open(dir.path()).unwrap();
    let err = s.save("", &serde_json::json!(1)).await.unwrap_err();
    assert!(matches!(err, StorageError::InvalidKey(_)));
}

#[tokio::test]
async fn file_storage_neutralizes_path_traversal_keys() {
    let dir = tempfile::tempdir().unwrap();
    let s = FileStorage::open(dir.path()).unwrap();

    // Keys that look like traversal attempts are hashed into safe filenames:
    // they round-trip correctly and never write outside the storage root.
    for key in ["../escape", "a/b", "with\\slash", "key:with:colons"] {
        s.save(key, &serde_json::json!(key)).await.unwrap();
        assert_eq!(s.load(key).await.unwrap(), Some(serde_json::json!(key)));
    }
    // Everything landed inside the storage directory (no escapes).
    let parent_files = std::fs::read_dir(dir.path().parent().unwrap())
        .unwrap()
        .filter(|e| {
            e.as_ref()
                .unwrap()
                .file_name()
                .to_string_lossy()
                .contains("escape")
        })
        .count();
    assert_eq!(parent_files, 0);

    let mut keys = s.list().await.unwrap();
    keys.sort();
    assert_eq!(keys.len(), 4);
}

#[tokio::test]
async fn memory_storage_round_trips() {
    let s = MemoryStorage::new();
    s.save("k", &serde_json::json!(1)).await.unwrap();
    assert_eq!(s.load("k").await.unwrap(), Some(serde_json::json!(1)));
}

#[tokio::test]
async fn cache_serves_and_invalidates() {
    let backend = Arc::new(MemoryStorage::new());
    let cache = Cache::new(backend.clone(), 8);

    cache.save("k", &serde_json::json!("v")).await.unwrap();
    // Mutate the backend out from under the cache; the cache should still serve
    // the value it owns (write-through correctness).
    assert_eq!(cache.load("k").await.unwrap(), Some(serde_json::json!("v")));
    assert_eq!(cache.len(), 1);

    cache.delete("k").await.unwrap();
    assert_eq!(cache.load("k").await.unwrap(), None);
}

#[tokio::test]
async fn cache_populates_on_miss() {
    let backend = Arc::new(MemoryStorage::new());
    backend.save("k", &serde_json::json!(7)).await.unwrap();
    let cache = Cache::new(backend, 8);
    assert!(cache.is_empty());
    assert_eq!(cache.load("k").await.unwrap(), Some(serde_json::json!(7)));
    assert_eq!(cache.len(), 1);
}