trazaeo 0.5.2

Open-source provenance SDK and specification for verifiable EO and climate data workflows
Documentation
use crate::content::{build_content_descriptor, ContentDescriptor, ContentDescriptorInput};
use crate::utils::{Chunk, Hash, HasherEngine};
use rayon::prelude::*;
use std::fs::File;
use std::io::{self, BufReader, Read};
use std::path::Path;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HashAlgorithm {
    Blake3,
}

pub const DEFAULT_STREAM_CHUNK_SIZE: usize = 1024 * 1024;

/// Validates hash algorithm.
pub fn validate_hash_algorithm(algorithm: HashAlgorithm) -> Result<(), &'static str> {
    match algorithm {
        HashAlgorithm::Blake3 => Ok(()),
    }
}

/// Handles init hasher.
pub fn init_hasher(pool_size: usize) -> HasherEngine {
    if pool_size > 1 {
        let pool = rayon::ThreadPoolBuilder::new()
            .num_threads(pool_size)
            .build()
            .ok();
        HasherEngine { pool }
    } else {
        HasherEngine { pool: None }
    }
}

/// Hashes chunk.
pub fn hash_chunk(_engine: &HasherEngine, chunk: &Chunk) -> Hash {
    let h = blake3::hash(&chunk.data);
    Hash(*h.as_bytes())
}

/// Hashes chunks parallel.
pub fn hash_chunks_parallel(engine: &HasherEngine, chunks: Vec<Chunk>) -> Vec<Hash> {
    if let Some(pool) = &engine.pool {
        pool.install(|| {
            chunks
                .into_par_iter()
                .map(|c| hash_chunk(engine, &c))
                .collect()
        })
    } else {
        chunks.into_iter().map(|c| hash_chunk(engine, &c)).collect()
    }
}

/// Handles blake3 hash with threads.
pub fn blake3_hash_with_threads(data: &[u8], threads: usize) -> Hash {
    if threads <= 1 {
        let h = blake3::hash(data);
        Hash(*h.as_bytes())
    } else {
        let maybe_pool = rayon::ThreadPoolBuilder::new()
            .num_threads(threads)
            .build()
            .ok();
        if let Some(pool) = maybe_pool {
            pool.install(|| {
                let mut hasher = blake3::Hasher::new();
                hasher.update_rayon(data);
                let h = hasher.finalize();
                Hash(*h.as_bytes())
            })
        } else {
            let h = blake3::hash(data);
            Hash(*h.as_bytes())
        }
    }
}

/// Computes rolling hash.
pub fn compute_rolling_hash(chunks: &[Chunk]) -> Hash {
    let mut hasher = blake3::Hasher::new();
    for chunk in chunks {
        let len = chunk.data.len() as u64;
        hasher.update(&len.to_le_bytes());
        hasher.update(&chunk.data);
    }
    let h = hasher.finalize();
    Hash(*h.as_bytes())
}

/// Computes root from hashes.
pub fn compute_root_from_hashes(hashes: &[Hash]) -> Hash {
    let mut hasher = blake3::Hasher::new();
    for (idx, hash) in hashes.iter().enumerate() {
        hasher.update(&(idx as u64).to_le_bytes());
        hasher.update(&hash.0);
    }
    let h = hasher.finalize();
    Hash(*h.as_bytes())
}

/// Hashes reader streaming.
pub fn hash_reader_streaming<R: Read>(
    reader: &mut R,
    chunk_size: usize,
    threads: usize,
) -> io::Result<(Vec<Hash>, Hash)> {
    if chunk_size == 0 {
        return Err(io::Error::new(
            io::ErrorKind::InvalidInput,
            "chunk_size must be greater than zero",
        ));
    }

    validate_hash_algorithm(HashAlgorithm::Blake3).map_err(io::Error::other)?;

    let engine = init_hasher(threads);
    let mut hashes = Vec::new();
    let mut buffer = vec![0u8; chunk_size];
    let mut root_hasher = blake3::Hasher::new();

    loop {
        let n = reader.read(&mut buffer)?;
        if n == 0 {
            break;
        }
        let chunk = Chunk {
            data: buffer[..n].to_vec(),
        };
        root_hasher.update(&chunk.data);
        hashes.push(hash_chunk(&engine, &chunk));
    }

    Ok((hashes, Hash(*root_hasher.finalize().as_bytes())))
}

/// Hashes reader streaming root.
pub fn hash_reader_streaming_root<R: Read>(
    reader: &mut R,
    chunk_size: usize,
    _threads: usize,
) -> io::Result<Hash> {
    if chunk_size == 0 {
        return Err(io::Error::new(
            io::ErrorKind::InvalidInput,
            "chunk_size must be greater than zero",
        ));
    }

    validate_hash_algorithm(HashAlgorithm::Blake3).map_err(io::Error::other)?;

    let mut buffer = vec![0u8; chunk_size];
    let mut root_hasher = blake3::Hasher::new();

    loop {
        let n = reader.read(&mut buffer)?;
        if n == 0 {
            break;
        }
        root_hasher.update(&buffer[..n]);
    }

    let h = root_hasher.finalize();
    Ok(Hash(*h.as_bytes()))
}

/// Hashes file streaming.
pub fn hash_file_streaming<P: AsRef<Path>>(
    path: P,
    chunk_size: usize,
    threads: usize,
) -> io::Result<(Vec<Hash>, Hash)> {
    let file = File::open(path)?;
    let mut reader = BufReader::new(file);
    hash_reader_streaming(&mut reader, chunk_size, threads)
}

/// Hashes file streaming root.
pub fn hash_file_streaming_root<P: AsRef<Path>>(
    path: P,
    chunk_size: usize,
    threads: usize,
) -> io::Result<Hash> {
    let file = File::open(path)?;
    let mut reader = BufReader::new(file);
    hash_reader_streaming_root(&mut reader, chunk_size, threads)
}

/// Hashes reader content descriptor.
pub fn hash_reader_content_descriptor<R: Read>(
    reader: &mut R,
    artifact_id: &str,
    chunk_size: usize,
    threads: usize,
    media_type: &str,
    created_at: &str,
) -> io::Result<ContentDescriptor> {
    let (hashes, root) = hash_reader_streaming(reader, chunk_size, threads)?;
    Ok(build_content_descriptor(ContentDescriptorInput {
        artifact_id,
        root,
        chunk_size,
        leaf_count: hashes.len(),
        byte_length: hashes.len() as u64 * chunk_size as u64,
        media_type,
        created_at,
    }))
}

/// Hashes file content descriptor.
pub fn hash_file_content_descriptor<P: AsRef<Path>>(
    path: P,
    artifact_id: &str,
    chunk_size: usize,
    threads: usize,
    media_type: &str,
    created_at: &str,
) -> io::Result<ContentDescriptor> {
    let path_ref = path.as_ref();
    let byte_length = std::fs::metadata(path_ref)?.len();
    let mut reader = BufReader::new(File::open(path_ref)?);
    let (hashes, root) = hash_reader_streaming(&mut reader, chunk_size, threads)?;
    Ok(build_content_descriptor(ContentDescriptorInput {
        artifact_id,
        root,
        chunk_size,
        leaf_count: hashes.len(),
        byte_length,
        media_type,
        created_at,
    }))
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::io::Cursor;
    use std::io::Write;
    use tempfile::NamedTempFile;

    /// Tests that init hasher respects pool size.
    #[test]
    fn init_hasher_respects_pool_size() {
        assert!(init_hasher(1).pool.is_none());
        assert!(init_hasher(2).pool.is_some());
    }

    /// Tests that hash chunk is deterministic.
    #[test]
    fn hash_chunk_is_deterministic() {
        let engine = init_hasher(1);
        let chunk = Chunk {
            data: b"hello".to_vec(),
        };
        let a = hash_chunk(&engine, &chunk);
        let b = hash_chunk(&engine, &chunk);
        assert_eq!(a, b);
    }

    /// Tests that hash chunks parallel matches sequential.
    #[test]
    fn hash_chunks_parallel_matches_sequential() {
        let chunks = vec![
            Chunk {
                data: b"a".to_vec(),
            },
            Chunk {
                data: b"bc".to_vec(),
            },
            Chunk {
                data: b"def".to_vec(),
            },
        ];
        let seq = hash_chunks_parallel(&init_hasher(1), chunks.clone());
        let par = hash_chunks_parallel(&init_hasher(4), chunks);
        assert_eq!(seq, par);
    }

    /// Tests that blake3 hash with threads consistent across thread counts.
    #[test]
    fn blake3_hash_with_threads_consistent_across_thread_counts() {
        let data = b"consistency check data";
        let single = blake3_hash_with_threads(data, 1);
        let multi = blake3_hash_with_threads(data, 4);
        assert_eq!(single, multi);
    }

    /// Tests that validate hash algorithm accepts blake3.
    #[test]
    fn validate_hash_algorithm_accepts_blake3() {
        assert!(validate_hash_algorithm(HashAlgorithm::Blake3).is_ok());
    }

    /// Tests that compute rolling hash is deterministic.
    #[test]
    fn compute_rolling_hash_is_deterministic() {
        let chunks = vec![
            Chunk {
                data: b"ab".to_vec(),
            },
            Chunk {
                data: b"cd".to_vec(),
            },
        ];
        assert_eq!(compute_rolling_hash(&chunks), compute_rolling_hash(&chunks));
    }

    /// Tests that compute root from hashes is order sensitive.
    #[test]
    fn compute_root_from_hashes_is_order_sensitive() {
        let h1 = Hash([1u8; 32]);
        let h2 = Hash([2u8; 32]);
        let a = compute_root_from_hashes(&[h1.clone(), h2.clone()]);
        let b = compute_root_from_hashes(&[h2, h1]);
        assert_ne!(a, b);
    }

    /// Tests that hash reader streaming matches manual chunk hashes.
    #[test]
    fn hash_reader_streaming_matches_manual_chunk_hashes() {
        let mut reader = Cursor::new(b"abcdefgh".to_vec());
        let (hashes, root) = hash_reader_streaming(&mut reader, 3, 1).expect("streaming hash");
        let chunks = vec![
            Chunk {
                data: b"abc".to_vec(),
            },
            Chunk {
                data: b"def".to_vec(),
            },
            Chunk {
                data: b"gh".to_vec(),
            },
        ];
        let expected_hashes = hash_chunks_parallel(&init_hasher(1), chunks);
        assert_eq!(hashes, expected_hashes);
        assert_eq!(root, Hash(*blake3::hash(b"abcdefgh").as_bytes()));
    }

    /// Tests that hash reader streaming rejects zero chunk size.
    #[test]
    fn hash_reader_streaming_rejects_zero_chunk_size() {
        let mut reader = Cursor::new(b"abc".to_vec());
        let err = hash_reader_streaming(&mut reader, 0, 1).expect_err("must fail");
        assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
    }

    /// Tests that hash file streaming is deterministic.
    #[test]
    fn hash_file_streaming_is_deterministic() {
        let mut file = NamedTempFile::new().expect("temp file");
        file.write_all(b"streaming-file-content").expect("write");

        let (hashes_a, root_a) = hash_file_streaming(file.path(), 4, 1).expect("hash a");
        let (hashes_b, root_b) = hash_file_streaming(file.path(), 4, 4).expect("hash b");

        assert_eq!(hashes_a, hashes_b);
        assert_eq!(root_a, root_b);
    }

    /// Tests that hash reader streaming root matches full variant root.
    #[test]
    fn hash_reader_streaming_root_matches_full_variant_root() {
        let data = b"abcdefghijklmno".to_vec();
        let mut full_reader = Cursor::new(data.clone());
        let mut root_reader = Cursor::new(data);
        let (_hashes, full_root) = hash_reader_streaming(&mut full_reader, 4, 2).expect("full");
        let root_only = hash_reader_streaming_root(&mut root_reader, 4, 2).expect("root");
        assert_eq!(full_root, root_only);
    }

    /// Tests that hash reader content descriptor reports metadata.
    #[test]
    fn hash_reader_content_descriptor_reports_metadata() {
        let mut reader = Cursor::new(b"abcdefg".to_vec());
        let descriptor = hash_reader_content_descriptor(
            &mut reader,
            "artifact-1",
            3,
            1,
            "application/octet-stream",
            "2026-01-01T00:00:00Z",
        )
        .expect("descriptor");
        assert_eq!(descriptor.artifact_id, "artifact-1");
        assert_eq!(descriptor.leaf_count, 3);
    }
}