use super::ChunkingProfileError;
use sha2::{Digest, Sha256};
use std::collections::{BTreeMap, BTreeSet, HashMap};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CdcParameters {
pub window_size: usize,
pub min_chunk_size: u64,
pub max_chunk_size: u64,
pub normalization_constant: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChunkReuseCriteria {
pub max_age_seconds: u64,
pub min_proof_strength: crate::atp::manifest::ProofStrength,
pub require_same_algorithm: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ChunkVerification {
pub algorithm: String,
pub proof_strength: crate::atp::manifest::ProofStrength,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CdcChunkData {
pub byte_offset: u64,
pub size_bytes: u64,
pub content_hash: [u8; 32],
}
pub struct CdcEngine;
impl CdcEngine {
pub fn new() -> Self {
Self
}
pub fn compute_cdc_boundaries(
&mut self,
data: &[u8],
params: &CdcParameters,
) -> Result<Vec<CdcChunkData>, ChunkingProfileError> {
if data.is_empty() {
return Ok(Vec::new());
}
let mut chunks = Vec::new();
let mut rolling_hash = RollingHash::new(params.window_size);
let mut last_boundary = 0u64;
let mask_bits = Self::compute_mask_bits_from_constant(params.normalization_constant);
let boundary_mask = (1u64 << mask_bits) - 1;
let initial_window = data.len().min(params.window_size);
for &byte in &data[..initial_window] {
rolling_hash.update(byte);
}
for (i, &byte) in data.iter().enumerate().skip(params.window_size) {
let old_byte = data[i - params.window_size];
rolling_hash.roll(old_byte, byte);
let current_pos = i as u64 + 1;
let chunk_size = current_pos - last_boundary;
let hash_boundary = (rolling_hash.hash() & boundary_mask) == 0;
let min_size_reached = chunk_size >= params.min_chunk_size;
let max_size_reached = chunk_size >= params.max_chunk_size;
if (hash_boundary && min_size_reached) || max_size_reached {
let chunk_data = &data[last_boundary as usize..current_pos as usize];
let content_hash = Self::compute_content_hash(chunk_data);
chunks.push(CdcChunkData {
byte_offset: last_boundary,
size_bytes: current_pos - last_boundary,
content_hash,
});
last_boundary = current_pos;
}
}
if last_boundary < data.len() as u64 {
let chunk_data = &data[last_boundary as usize..];
let content_hash = Self::compute_content_hash(chunk_data);
chunks.push(CdcChunkData {
byte_offset: last_boundary,
size_bytes: data.len() as u64 - last_boundary,
content_hash,
});
}
Ok(chunks)
}
fn compute_mask_bits_from_constant(constant: u64) -> u32 {
let mut hasher = Sha256::new();
hasher.update(constant.to_be_bytes());
let hash = hasher.finalize();
let hash_u32 = u32::from_be_bytes([hash[0], hash[1], hash[2], hash[3]]);
let bits = (hash_u32 % 16) + 8; bits
}
fn compute_content_hash(data: &[u8]) -> [u8; 32] {
let mut hasher = Sha256::new();
hasher.update(data);
hasher.finalize().into()
}
}
pub struct RollingHash {
window_size: usize,
window: Vec<u8>,
position: usize,
hash_a: u64,
hash_b: u64,
}
impl RollingHash {
pub fn new(window_size: usize) -> Self {
let window_size = std::cmp::max(1, window_size);
Self {
window_size,
window: vec![0; window_size],
position: 0,
hash_a: 0,
hash_b: 0,
}
}
pub fn update(&mut self, byte: u8) {
if self.position < self.window_size {
self.window[self.position] = byte; self.hash_a = self.hash_a.wrapping_add(byte as u64);
self.hash_b = self.hash_b.wrapping_add(self.hash_a);
self.position += 1;
}
}
pub fn roll(&mut self, old_byte: u8, new_byte: u8) {
self.hash_a = self
.hash_a
.wrapping_sub(old_byte as u64)
.wrapping_add(new_byte as u64);
self.hash_b = self
.hash_b
.wrapping_sub((self.window_size as u64).wrapping_mul(old_byte as u64))
.wrapping_add(self.hash_a);
let idx = self.position % self.window_size;
self.window[idx] = new_byte; self.position += 1;
}
pub fn hash(&self) -> u64 {
(self.hash_b << 32) | (self.hash_a & 0xFFFFFFFF)
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ChunkIdentity {
pub content_hash: [u8; 32],
pub size_bytes: u64,
pub capability_scope: String,
pub verification: ChunkVerification,
}
impl ChunkIdentity {
pub fn from_data(
data: &[u8],
capability_scope: &str,
proof_strength: crate::atp::manifest::ProofStrength,
) -> Self {
let content_hash = Self::compute_content_hash(data);
let size_bytes = data.len() as u64;
Self {
content_hash,
size_bytes,
capability_scope: capability_scope.to_string(),
verification: ChunkVerification {
algorithm: "sha256".to_string(),
proof_strength,
},
}
}
fn compute_content_hash(data: &[u8]) -> [u8; 32] {
let mut hasher = Sha256::new();
hasher.update(data);
hasher.finalize().into()
}
pub fn identity_string(&self) -> String {
let hash_hex = hex_hash(&self.content_hash);
format!("{}:{}:{}", hash_hex, self.size_bytes, self.capability_scope)
}
}
pub struct ChunkCache {
chunks: HashMap<ChunkIdentity, CachedChunk>,
content_hash_index: HashMap<[u8; 32], BTreeSet<ChunkIdentity>>,
current_size: u64,
max_size: u64,
cache_hits: u64,
cache_misses: u64,
}
#[derive(Debug, Clone)]
pub struct CachedChunk {
pub data: Vec<u8>,
pub last_accessed: std::time::SystemTime,
pub reuse_count: u32,
pub source_object: Option<String>,
}
impl ChunkCache {
pub fn new(max_size: u64) -> Self {
Self {
chunks: HashMap::new(),
content_hash_index: HashMap::new(),
current_size: 0,
max_size,
cache_hits: 0,
cache_misses: 0,
}
}
pub fn store_chunk(
&mut self,
identity: &ChunkIdentity,
data: &[u8],
) -> Result<(), ChunkingProfileError> {
let data_len = u64::try_from(data.len()).map_err(|_| {
ChunkingProfileError::InvalidChunkParameters(
"chunk data length exceeds supported size".to_string(),
)
})?;
if data_len != identity.size_bytes {
return Err(ChunkingProfileError::InvalidChunkParameters(
"chunk data size doesn't match identity".to_string(),
));
}
if data_len > self.max_size {
return Err(ChunkingProfileError::InvalidChunkParameters(
"chunk data exceeds cache size limit".to_string(),
));
}
let computed_hash = ChunkIdentity::compute_content_hash(data);
if computed_hash != identity.content_hash {
return Err(ChunkingProfileError::InvalidChunkParameters(
"chunk data hash doesn't match identity".to_string(),
));
}
self.remove_chunk(identity);
let target_size = self.max_size.saturating_sub(data_len);
while self.current_size > target_size && !self.chunks.is_empty() {
self.evict_least_recently_used();
}
let cached_chunk = CachedChunk {
data: data.to_vec(),
last_accessed: std::time::SystemTime::now(),
reuse_count: 0,
source_object: None,
};
self.current_size += data_len;
self.content_hash_index
.entry(identity.content_hash)
.or_default()
.insert(identity.clone());
self.chunks.insert(identity.clone(), cached_chunk);
Ok(())
}
pub fn lookup_chunk(&mut self, identity: &ChunkIdentity) -> Option<Vec<u8>> {
if let Some(chunk) = self.chunks.get_mut(identity) {
chunk.last_accessed = std::time::SystemTime::now();
chunk.reuse_count += 1;
self.cache_hits += 1;
Some(chunk.data.clone())
} else {
self.cache_misses += 1;
None
}
}
pub fn retrieve_chunk(
&mut self,
identity: &ChunkIdentity,
) -> Result<Option<Vec<u8>>, ChunkingProfileError> {
Ok(self.lookup_chunk(identity))
}
pub fn find_similar_chunks(&self, content_hash: [u8; 32]) -> Vec<&ChunkIdentity> {
self.content_hash_index
.get(&content_hash)
.map(|identities| identities.iter().collect())
.unwrap_or_default()
}
pub fn can_reuse_chunk(&self, chunk_identity: &ChunkIdentity, requesting_scope: &str) -> bool {
chunk_identity.capability_scope.is_empty()
|| chunk_identity.capability_scope == requesting_scope
}
fn evict_least_recently_used(&mut self) {
let oldest_identity = self
.chunks
.iter()
.min_by_key(|(_, chunk)| chunk.last_accessed)
.map(|(identity, _)| identity.clone());
if let Some(identity) = oldest_identity {
self.remove_chunk(&identity);
}
}
fn remove_chunk(&mut self, identity: &ChunkIdentity) {
if self.chunks.remove(identity).is_some() {
self.current_size = self.current_size.saturating_sub(identity.size_bytes);
if let Some(identities) = self.content_hash_index.get_mut(&identity.content_hash) {
identities.remove(identity);
if identities.is_empty() {
self.content_hash_index.remove(&identity.content_hash);
}
}
}
}
pub fn stats(&self) -> ChunkCacheStats {
self.get_statistics()
}
pub fn get_statistics(&self) -> ChunkCacheStats {
let total_reuse_count: u32 = self.chunks.values().map(|c| c.reuse_count).sum();
ChunkCacheStats {
total_chunks: self.chunks.len(),
current_size: self.current_size,
max_size: self.max_size,
total_reuse_count,
utilization: if self.max_size == 0 {
0.0
} else {
self.current_size as f64 / self.max_size as f64
},
cache_hits: self.cache_hits,
cache_misses: self.cache_misses,
}
}
}
#[derive(Debug, Clone)]
pub struct ChunkCacheStats {
pub total_chunks: usize,
pub current_size: u64,
pub max_size: u64,
pub total_reuse_count: u32,
pub utilization: f64,
pub cache_hits: u64,
pub cache_misses: u64,
}
pub struct ChunkReuseManager {
cache: ChunkCache,
transfer_chunks: BTreeMap<String, Vec<ChunkIdentity>>,
transfer_stats: BTreeMap<String, TransferReuseStats>,
}
#[derive(Debug, Clone)]
pub struct TransferReuseStats {
pub total_chunks_reused: u64,
pub bytes_saved: u64,
pub deduplication_ratio: f64,
}
impl ChunkReuseManager {
pub fn new() -> Self {
Self {
cache: ChunkCache::new(100 * 1024 * 1024), transfer_chunks: BTreeMap::new(),
transfer_stats: BTreeMap::new(),
}
}
pub fn register_transfer_chunk(
&mut self,
transfer_id: &str,
identity: &ChunkIdentity,
) -> Result<(), ChunkingProfileError> {
self.transfer_chunks
.entry(transfer_id.to_string())
.or_default()
.push(identity.clone());
Ok(())
}
fn capability_scope_for_transfer(&self, transfer_id: &str) -> Option<String> {
let Some(identities) = self.transfer_chunks.get(transfer_id) else {
return Some(transfer_scope(transfer_id));
};
let mut registered_scope = None;
for identity in identities {
if identity.capability_scope.is_empty() {
continue;
}
match ®istered_scope {
Some(scope) if scope != &identity.capability_scope => return None,
Some(_) => {}
None => registered_scope = Some(identity.capability_scope.clone()),
}
}
registered_scope.or_else(|| Some(String::new()))
}
pub fn find_reusable_chunks(
&self,
transfer_id: &str,
content_hashes: &[[u8; 32]],
_criteria: &ChunkReuseCriteria,
) -> Vec<ChunkIdentity> {
let mut reusable = Vec::new();
let requesting_scope = self
.capability_scope_for_transfer(transfer_id)
.unwrap_or_default();
for &hash in content_hashes {
let similar = self.cache.find_similar_chunks(hash);
for chunk in similar {
if self.cache.can_reuse_chunk(chunk, &requesting_scope) {
reusable.push(chunk.clone());
}
}
}
reusable
}
pub fn register_chunk_reuse(
&mut self,
transfer_id: &str,
identity: &ChunkIdentity,
_source_transfer_id: &str,
) -> Result<(), ChunkingProfileError> {
let stats = self
.transfer_stats
.entry(transfer_id.to_string())
.or_insert_with(|| TransferReuseStats {
total_chunks_reused: 0,
bytes_saved: 0,
deduplication_ratio: 0.0,
});
stats.total_chunks_reused += 1;
stats.bytes_saved += identity.size_bytes;
stats.deduplication_ratio =
stats.bytes_saved as f64 / (stats.bytes_saved as f64 + 1_000_000.0);
Ok(())
}
pub fn get_reuse_statistics(&self, transfer_id: &str) -> Option<TransferReuseStats> {
self.transfer_stats.get(transfer_id).cloned()
}
pub fn store_chunk_for_reuse(
&mut self,
chunk_data: &[u8],
transfer_id: &str,
) -> Result<ChunkIdentity, ChunkingProfileError> {
let identity = ChunkIdentity::from_data(
chunk_data,
&transfer_scope(transfer_id),
crate::atp::manifest::ProofStrength::Basic,
);
self.cache.store_chunk(&identity, chunk_data)?;
self.register_transfer_chunk(transfer_id, &identity)?;
Ok(identity)
}
}
fn hex_hash(hash: &[u8; 32]) -> String {
hash.iter().map(|b| format!("{:02x}", b)).collect()
}
fn transfer_scope(transfer_id: &str) -> String {
format!("transfer-{transfer_id}")
}
#[cfg(test)]
mod active_tests {
use super::*;
use crate::atp::manifest::ProofStrength;
fn criteria() -> ChunkReuseCriteria {
ChunkReuseCriteria {
max_age_seconds: 3600,
min_proof_strength: ProofStrength::Basic,
require_same_algorithm: true,
}
}
#[test]
fn same_transfer_reuses_own_scoped_chunk() {
let mut manager = ChunkReuseManager::new();
let identity = manager
.store_chunk_for_reuse(b"chunk-data", "transfer-a")
.unwrap();
let reusable =
manager.find_reusable_chunks("transfer-a", &[identity.content_hash], &criteria());
assert_eq!(reusable, vec![identity]);
}
#[test]
fn different_transfer_cannot_reuse_private_scope() {
let mut manager = ChunkReuseManager::new();
let identity = manager
.store_chunk_for_reuse(b"chunk-data", "transfer-a")
.unwrap();
let reusable =
manager.find_reusable_chunks("transfer-b", &[identity.content_hash], &criteria());
assert!(reusable.is_empty());
}
#[test]
fn conflicting_registered_scopes_fail_closed_to_global_only_reuse() {
let mut manager = ChunkReuseManager::new();
let private_a = ChunkIdentity::from_data(b"aaa", "scope-a", ProofStrength::Basic);
let private_b = ChunkIdentity::from_data(b"bbb", "scope-b", ProofStrength::Basic);
let global = ChunkIdentity::from_data(b"ccc", "", ProofStrength::Basic);
manager
.register_transfer_chunk("mixed", &private_a)
.unwrap();
manager
.register_transfer_chunk("mixed", &private_b)
.unwrap();
manager.cache.store_chunk(&private_a, b"aaa").unwrap();
manager.cache.store_chunk(&private_b, b"bbb").unwrap();
manager.cache.store_chunk(&global, b"ccc").unwrap();
let reusable = manager.find_reusable_chunks(
"mixed",
&[
private_a.content_hash,
private_b.content_hash,
global.content_hash,
],
&criteria(),
);
assert_eq!(reusable, vec![global]);
}
#[test]
fn replacing_same_identity_does_not_inflate_cache_size() {
let data = b"repeat";
let identity = ChunkIdentity::from_data(data, "scope-a", ProofStrength::Basic);
let mut cache = ChunkCache::new(1024);
cache.store_chunk(&identity, data).unwrap();
cache.store_chunk(&identity, data).unwrap();
let stats = cache.get_statistics();
assert_eq!(stats.total_chunks, 1);
assert_eq!(stats.current_size, data.len() as u64);
}
#[test]
fn oversized_chunk_is_rejected_without_cache_growth() {
let data = b"too-large";
let identity = ChunkIdentity::from_data(data, "scope-a", ProofStrength::Basic);
let mut cache = ChunkCache::new(1);
let err = cache.store_chunk(&identity, data).unwrap_err();
assert!(matches!(
err,
ChunkingProfileError::InvalidChunkParameters(_)
));
assert_eq!(cache.get_statistics().current_size, 0);
}
#[test]
fn zero_sized_cache_reports_zero_utilization() {
let cache = ChunkCache::new(0);
assert_eq!(cache.get_statistics().utilization, 0.0);
}
}