use crate::content::{build_content_descriptor, ContentDescriptor, ContentDescriptorInput};
use crate::utils::{Chunk, Hash, HasherEngine};
use rayon::prelude::*;
use sha2::{Digest, Sha256};
use std::fs::File;
use std::io::{self, BufReader, Read};
use std::path::Path;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HashAlgorithm {
Blake3,
}
pub const DEFAULT_STREAM_CHUNK_SIZE: usize = 1024 * 1024;
pub fn validate_hash_algorithm(algorithm: HashAlgorithm) -> Result<(), &'static str> {
match algorithm {
HashAlgorithm::Blake3 => Ok(()),
}
}
pub fn init_hasher(pool_size: usize) -> HasherEngine {
if pool_size > 1 {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(pool_size)
.build()
.ok();
HasherEngine { pool }
} else {
HasherEngine { pool: None }
}
}
pub fn hash_chunk(_engine: &HasherEngine, chunk: &Chunk) -> Hash {
let h = blake3::hash(&chunk.data);
Hash(*h.as_bytes())
}
pub fn hash_chunks_parallel(engine: &HasherEngine, chunks: Vec<Chunk>) -> Vec<Hash> {
if let Some(pool) = &engine.pool {
pool.install(|| {
chunks
.into_par_iter()
.map(|c| hash_chunk(engine, &c))
.collect()
})
} else {
chunks.into_iter().map(|c| hash_chunk(engine, &c)).collect()
}
}
pub fn blake3_hash_with_threads(data: &[u8], threads: usize) -> Hash {
if threads <= 1 {
let h = blake3::hash(data);
Hash(*h.as_bytes())
} else {
let maybe_pool = rayon::ThreadPoolBuilder::new()
.num_threads(threads)
.build()
.ok();
if let Some(pool) = maybe_pool {
pool.install(|| {
let mut hasher = blake3::Hasher::new();
hasher.update_rayon(data);
let h = hasher.finalize();
Hash(*h.as_bytes())
})
} else {
let h = blake3::hash(data);
Hash(*h.as_bytes())
}
}
}
pub fn compute_rolling_hash(chunks: &[Chunk]) -> Hash {
let mut hasher = blake3::Hasher::new();
for chunk in chunks {
let len = chunk.data.len() as u64;
hasher.update(&len.to_le_bytes());
hasher.update(&chunk.data);
}
let h = hasher.finalize();
Hash(*h.as_bytes())
}
pub fn compute_root_from_hashes(hashes: &[Hash]) -> Hash {
let mut hasher = blake3::Hasher::new();
for (idx, hash) in hashes.iter().enumerate() {
hasher.update(&(idx as u64).to_le_bytes());
hasher.update(&hash.0);
}
let h = hasher.finalize();
Hash(*h.as_bytes())
}
fn validate_streaming_config(chunk_size: usize) -> io::Result<()> {
if chunk_size == 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"chunk_size must be greater than zero",
));
}
validate_hash_algorithm(HashAlgorithm::Blake3).map_err(io::Error::other)
}
pub fn hash_reader_streaming<R: Read>(
reader: &mut R,
chunk_size: usize,
threads: usize,
) -> io::Result<(Vec<Hash>, Hash)> {
let (hashes, root, _sha256, _byte_length) =
hash_reader_streaming_with_sha256(reader, chunk_size, threads)?;
Ok((hashes, root))
}
fn hash_reader_streaming_with_sha256<R: Read>(
reader: &mut R,
chunk_size: usize,
threads: usize,
) -> io::Result<(Vec<Hash>, Hash, String, u64)> {
validate_streaming_config(chunk_size)?;
let engine = init_hasher(threads);
let mut hashes = Vec::new();
let mut buffer = vec![0u8; chunk_size];
let mut root_hasher = blake3::Hasher::new();
let mut c2pa_hasher = Sha256::new();
let mut byte_length = 0u64;
loop {
let n = reader.read(&mut buffer)?;
if n == 0 {
break;
}
let chunk = Chunk {
data: buffer[..n].to_vec(),
};
byte_length += n as u64;
root_hasher.update(&chunk.data);
c2pa_hasher.update(&chunk.data);
hashes.push(hash_chunk(&engine, &chunk));
}
Ok((
hashes,
Hash(*root_hasher.finalize().as_bytes()),
hex::encode(c2pa_hasher.finalize()),
byte_length,
))
}
pub fn hash_reader_streaming_root<R: Read>(
reader: &mut R,
chunk_size: usize,
_threads: usize,
) -> io::Result<Hash> {
validate_streaming_config(chunk_size)?;
let mut buffer = vec![0u8; chunk_size];
let mut root_hasher = blake3::Hasher::new();
loop {
let n = reader.read(&mut buffer)?;
if n == 0 {
break;
}
root_hasher.update(&buffer[..n]);
}
let h = root_hasher.finalize();
Ok(Hash(*h.as_bytes()))
}
pub fn hash_file_streaming<P: AsRef<Path>>(
path: P,
chunk_size: usize,
threads: usize,
) -> io::Result<(Vec<Hash>, Hash)> {
let file = File::open(path)?;
let mut reader = BufReader::new(file);
hash_reader_streaming(&mut reader, chunk_size, threads)
}
pub fn hash_file_streaming_root<P: AsRef<Path>>(
path: P,
chunk_size: usize,
threads: usize,
) -> io::Result<Hash> {
let file = File::open(path)?;
let mut reader = BufReader::new(file);
hash_reader_streaming_root(&mut reader, chunk_size, threads)
}
pub fn hash_reader_content_descriptor<R: Read>(
reader: &mut R,
artifact_id: &str,
chunk_size: usize,
threads: usize,
media_type: &str,
created_at: &str,
) -> io::Result<ContentDescriptor> {
let (hashes, root, sha256, byte_length) =
hash_reader_streaming_with_sha256(reader, chunk_size, threads)?;
let mut descriptor = build_content_descriptor(ContentDescriptorInput {
artifact_id,
root,
chunk_size,
leaf_count: hashes.len(),
byte_length,
media_type,
created_at,
});
descriptor
.c2pa_hard_bindings
.push(crate::c2pa::C2paHardBinding {
alg: crate::c2pa::C2PA_HASH_ALG_SHA256.to_string(),
hash: sha256,
});
Ok(descriptor)
}
pub fn hash_file_content_descriptor<P: AsRef<Path>>(
path: P,
artifact_id: &str,
chunk_size: usize,
threads: usize,
media_type: &str,
created_at: &str,
) -> io::Result<ContentDescriptor> {
let path_ref = path.as_ref();
let byte_length = std::fs::metadata(path_ref)?.len();
let mut reader = BufReader::new(File::open(path_ref)?);
let (hashes, root, sha256, _byte_length) =
hash_reader_streaming_with_sha256(&mut reader, chunk_size, threads)?;
let mut descriptor = build_content_descriptor(ContentDescriptorInput {
artifact_id,
root,
chunk_size,
leaf_count: hashes.len(),
byte_length,
media_type,
created_at,
});
descriptor
.c2pa_hard_bindings
.push(crate::c2pa::C2paHardBinding {
alg: crate::c2pa::C2PA_HASH_ALG_SHA256.to_string(),
hash: sha256,
});
Ok(descriptor)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
use std::io::Write;
use tempfile::NamedTempFile;
#[test]
fn init_hasher_respects_pool_size() {
assert!(init_hasher(1).pool.is_none());
assert!(init_hasher(2).pool.is_some());
}
#[test]
fn hash_chunk_is_deterministic() {
let engine = init_hasher(1);
let chunk = Chunk {
data: b"hello".to_vec(),
};
let a = hash_chunk(&engine, &chunk);
let b = hash_chunk(&engine, &chunk);
assert_eq!(a, b);
}
#[test]
fn hash_chunks_parallel_matches_sequential() {
let chunks = vec![
Chunk {
data: b"a".to_vec(),
},
Chunk {
data: b"bc".to_vec(),
},
Chunk {
data: b"def".to_vec(),
},
];
let seq = hash_chunks_parallel(&init_hasher(1), chunks.clone());
let par = hash_chunks_parallel(&init_hasher(4), chunks);
assert_eq!(seq, par);
}
#[test]
fn blake3_hash_with_threads_consistent_across_thread_counts() {
let data = b"consistency check data";
let single = blake3_hash_with_threads(data, 1);
let multi = blake3_hash_with_threads(data, 4);
assert_eq!(single, multi);
}
#[test]
fn validate_hash_algorithm_accepts_blake3() {
assert!(validate_hash_algorithm(HashAlgorithm::Blake3).is_ok());
}
#[test]
fn compute_rolling_hash_is_deterministic() {
let chunks = vec![
Chunk {
data: b"ab".to_vec(),
},
Chunk {
data: b"cd".to_vec(),
},
];
assert_eq!(compute_rolling_hash(&chunks), compute_rolling_hash(&chunks));
}
#[test]
fn compute_root_from_hashes_is_order_sensitive() {
let h1 = Hash([1u8; 32]);
let h2 = Hash([2u8; 32]);
let a = compute_root_from_hashes(&[h1.clone(), h2.clone()]);
let b = compute_root_from_hashes(&[h2, h1]);
assert_ne!(a, b);
}
#[test]
fn hash_reader_streaming_matches_manual_chunk_hashes() {
let mut reader = Cursor::new(b"abcdefgh".to_vec());
let (hashes, root) = hash_reader_streaming(&mut reader, 3, 1).expect("streaming hash");
let chunks = vec![
Chunk {
data: b"abc".to_vec(),
},
Chunk {
data: b"def".to_vec(),
},
Chunk {
data: b"gh".to_vec(),
},
];
let expected_hashes = hash_chunks_parallel(&init_hasher(1), chunks);
assert_eq!(hashes, expected_hashes);
assert_eq!(root, Hash(*blake3::hash(b"abcdefgh").as_bytes()));
}
#[test]
fn hash_reader_streaming_rejects_zero_chunk_size() {
let mut reader = Cursor::new(b"abc".to_vec());
let err = hash_reader_streaming(&mut reader, 0, 1).expect_err("must fail");
assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
}
#[test]
fn hash_file_streaming_is_deterministic() {
let mut file = NamedTempFile::new().expect("temp file");
file.write_all(b"streaming-file-content").expect("write");
let (hashes_a, root_a) = hash_file_streaming(file.path(), 4, 1).expect("hash a");
let (hashes_b, root_b) = hash_file_streaming(file.path(), 4, 4).expect("hash b");
assert_eq!(hashes_a, hashes_b);
assert_eq!(root_a, root_b);
}
#[test]
fn hash_reader_streaming_root_matches_full_variant_root() {
let data = b"abcdefghijklmno".to_vec();
let mut full_reader = Cursor::new(data.clone());
let mut root_reader = Cursor::new(data);
let (_hashes, full_root) = hash_reader_streaming(&mut full_reader, 4, 2).expect("full");
let root_only = hash_reader_streaming_root(&mut root_reader, 4, 2).expect("root");
assert_eq!(full_root, root_only);
}
#[test]
fn hash_reader_content_descriptor_reports_metadata() {
let mut reader = Cursor::new(b"abcdefg".to_vec());
let descriptor = hash_reader_content_descriptor(
&mut reader,
"artifact-1",
3,
1,
"application/octet-stream",
"2026-01-01T00:00:00Z",
)
.expect("descriptor");
assert_eq!(descriptor.artifact_id, "artifact-1");
assert_eq!(descriptor.leaf_count, 3);
}
}