hashtree-cli 0.2.63

Hashtree daemon and CLI - content-addressed storage with P2P sync
Documentation
#![cfg(feature = "lmdb")]

use anyhow::Result;
use hashtree_cli::HashtreeStore;
use hashtree_config::StorageBackend;
use hashtree_core::{sha256, types::Hash};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tempfile::TempDir;

fn env_usize(name: &str, default: usize) -> usize {
    std::env::var(name)
        .ok()
        .and_then(|value| value.parse().ok())
        .unwrap_or(default)
}

fn env_u64(name: &str, default: u64) -> u64 {
    std::env::var(name)
        .ok()
        .and_then(|value| value.parse().ok())
        .unwrap_or(default)
}

fn payload(seed: u64, len: usize) -> Vec<u8> {
    let mut state = seed.wrapping_mul(0x9e37_79b9_7f4a_7c15);
    let mut out = vec![0u8; len];
    for byte in &mut out {
        state ^= state << 13;
        state ^= state >> 7;
        state ^= state << 17;
        *byte = state as u8;
    }
    out
}

#[test]
#[ignore = "stress benchmark; set HTREE_ACCESS_STRESS_* env vars for longer runs"]
fn mixed_blob_reader_writer_access_stress_lmdb() -> Result<()> {
    let blob_count = env_usize("HTREE_ACCESS_STRESS_BLOBS", 512);
    let blob_size = env_usize("HTREE_ACCESS_STRESS_BLOB_BYTES", 16 * 1024);
    let readers = env_usize("HTREE_ACCESS_STRESS_READERS", 8);
    let writers = env_usize("HTREE_ACCESS_STRESS_WRITERS", 2);
    let duration = Duration::from_secs(env_u64("HTREE_ACCESS_STRESS_SECONDS", 2));
    let max_bytes = env_u64("HTREE_ACCESS_STRESS_MAX_BYTES", 128 * 1024 * 1024);

    assert!(blob_count > 0, "HTREE_ACCESS_STRESS_BLOBS must be nonzero");

    let temp = TempDir::new()?;
    let store = Arc::new(HashtreeStore::with_options_and_backend(
        temp.path(),
        None,
        max_bytes,
        true,
        &StorageBackend::Lmdb,
    )?);

    let mut seeded = Vec::<Hash>::with_capacity(blob_count);
    for index in 0..blob_count {
        let data = payload(index as u64, blob_size);
        let hash = sha256(&data);
        store.put_cached_blob(&data)?;
        seeded.push(hash);
    }
    let seeded = Arc::new(seeded);

    let stop = Arc::new(AtomicBool::new(false));
    let reads = Arc::new(AtomicU64::new(0));
    let writes = Arc::new(AtomicU64::new(0));
    let misses = Arc::new(AtomicU64::new(0));
    let errors = Arc::new(AtomicU64::new(0));
    let start = Instant::now();

    let mut handles = Vec::new();
    for reader_id in 0..readers {
        let store = Arc::clone(&store);
        let hashes = Arc::clone(&seeded);
        let stop = Arc::clone(&stop);
        let reads = Arc::clone(&reads);
        let misses = Arc::clone(&misses);
        let errors = Arc::clone(&errors);
        handles.push(std::thread::spawn(move || {
            let mut cursor = reader_id;
            while !stop.load(Ordering::Relaxed) {
                let hash = hashes[cursor % hashes.len()];
                match store.get_blob(&hash) {
                    Ok(Some(_)) => {
                        reads.fetch_add(1, Ordering::Relaxed);
                    }
                    Ok(None) => {
                        misses.fetch_add(1, Ordering::Relaxed);
                    }
                    Err(_) => {
                        errors.fetch_add(1, Ordering::Relaxed);
                    }
                }
                cursor = cursor.wrapping_add(readers.max(1));
            }
        }));
    }

    for writer_id in 0..writers {
        let store = Arc::clone(&store);
        let stop = Arc::clone(&stop);
        let writes = Arc::clone(&writes);
        let errors = Arc::clone(&errors);
        handles.push(std::thread::spawn(move || {
            let mut index = 0u64;
            while !stop.load(Ordering::Relaxed) {
                let data = payload(1_000_000 + writer_id as u64 + index * 97, blob_size);
                match store.put_cached_blob(&data) {
                    Ok(_) => {
                        writes.fetch_add(1, Ordering::Relaxed);
                    }
                    Err(_) => {
                        errors.fetch_add(1, Ordering::Relaxed);
                    }
                }
                index = index.wrapping_add(1);
            }
        }));
    }

    while start.elapsed() < duration {
        std::thread::sleep(Duration::from_millis(50));
        let _ = store.get_storage_stats()?;
    }
    stop.store(true, Ordering::Relaxed);

    for handle in handles {
        handle.join().expect("stress worker panicked");
    }

    let stats = store.get_storage_stats()?;
    eprintln!(
        "mixed_blob_reader_writer_access_stress_lmdb: reads={} writes={} misses={} errors={} stored_objects={} total_bytes={}",
        reads.load(Ordering::Relaxed),
        writes.load(Ordering::Relaxed),
        misses.load(Ordering::Relaxed),
        errors.load(Ordering::Relaxed),
        stats.total_dags,
        stats.total_bytes
    );

    assert!(reads.load(Ordering::Relaxed) > 0, "expected blob reads");
    assert!(writes.load(Ordering::Relaxed) > 0, "expected blob writes");
    assert_eq!(errors.load(Ordering::Relaxed), 0, "stress workers failed");
    Ok(())
}