use super::{
ChunkingProfileError,
profiles::{ChunkingProfile as ChunkingProfileTrait, utils},
};
use crate::atp::manifest::{
ChunkBoundary, ChunkMetadata, ChunkPlan, ChunkStrategy, ThroughputTier,
};
pub struct BulkFileProfile;
impl ChunkingProfileTrait for BulkFileProfile {
fn chunk_plan(object_size_bytes: u64) -> ChunkPlan {
let (target_chunk_size, min_chunk_size, max_chunk_size) =
Self::compute_chunk_sizes(object_size_bytes);
ChunkPlan {
strategy: ChunkStrategy::FixedSize,
target_chunk_size,
min_chunk_size,
max_chunk_size,
cdc_params: None, }
}
fn compute_boundaries(data: &[u8]) -> Result<Vec<ChunkBoundary>, ChunkingProfileError> {
if data.is_empty() {
return Ok(Vec::new());
}
let data_len = utils::data_len_u64(data)?;
let chunk_plan = Self::chunk_plan(data_len);
let target_size = utils::u64_to_usize(chunk_plan.target_chunk_size, "target chunk size")?;
let min_size = utils::u64_to_usize(chunk_plan.min_chunk_size, "minimum chunk size")?;
let merge_threshold =
utils::checked_usize_add(target_size, min_size, "bulk remainder threshold")?;
let mut positions = Vec::new();
let mut current_pos = 0usize;
while current_pos < data.len() {
let remaining = data.len() - current_pos;
let chunk_size = if remaining <= merge_threshold {
remaining } else {
target_size
};
current_pos = current_pos.checked_add(chunk_size).ok_or_else(|| {
ChunkingProfileError::InvalidChunkParameters(
"bulk chunk position overflow".to_string(),
)
})?;
positions.push(utils::usize_to_u64(current_pos, "bulk chunk boundary")?);
}
let boundaries = utils::positions_to_boundaries(
data,
&positions,
ChunkStrategy::FixedSize,
|_index, _offset, size, _chunk_data| {
let throughput_tier = Self::determine_throughput_tier(size, data_len);
ChunkMetadata::BulkFile { throughput_tier }
},
)?;
utils::validate_boundary_ordering(&boundaries)?;
Ok(boundaries)
}
fn validate_boundaries(boundaries: &[ChunkBoundary]) -> Result<(), ChunkingProfileError> {
utils::validate_boundary_ordering(boundaries)?;
for boundary in boundaries {
if !matches!(boundary.strategy, ChunkStrategy::FixedSize) {
return Err(ChunkingProfileError::InvalidChunkParameters(
"bulk file profile requires fixed-size chunking".to_string(),
));
}
if !matches!(boundary.metadata, Some(ChunkMetadata::BulkFile { .. })) {
return Err(ChunkingProfileError::InvalidChunkParameters(
"bulk file profile requires BulkFile metadata".to_string(),
));
}
if boundary.size_bytes < Self::absolute_min_chunk_size() {
return Err(ChunkingProfileError::InvalidChunkParameters(format!(
"chunk size {} below minimum {}",
boundary.size_bytes,
Self::absolute_min_chunk_size()
)));
}
if boundary.size_bytes > Self::max_chunk_size() {
return Err(ChunkingProfileError::InvalidChunkParameters(format!(
"chunk size {} above maximum {}",
boundary.size_bytes,
Self::max_chunk_size()
)));
}
}
Ok(())
}
fn min_chunking_threshold() -> u64 {
256 * 1024
}
fn max_chunk_size() -> u64 {
16 * 1024 * 1024
}
fn supports_incremental_chunking() -> bool {
true }
}
impl BulkFileProfile {
fn compute_chunk_sizes(object_size_bytes: u64) -> (u64, u64, u64) {
match object_size_bytes {
0..=1_048_576 => {
(64 * 1024, 16 * 1024, 128 * 1024)
}
1_048_577..=100_000_000 => {
(1024 * 1024, 256 * 1024, 2 * 1024 * 1024)
}
100_000_001..=1_000_000_000 => {
(4 * 1024 * 1024, 1024 * 1024, 8 * 1024 * 1024)
}
_ => {
(16 * 1024 * 1024, 4 * 1024 * 1024, 16 * 1024 * 1024)
}
}
}
fn determine_throughput_tier(chunk_size: u64, total_size: u64) -> ThroughputTier {
let chunk_ratio = if total_size == 0 {
0.0
} else {
chunk_size as f64 / total_size as f64
};
if chunk_size < 256 * 1024 || chunk_ratio < 0.01 {
ThroughputTier::Tail
} else if chunk_size >= 4 * 1024 * 1024 {
ThroughputTier::Bulk
} else {
ThroughputTier::Standard
}
}
const fn absolute_min_chunk_size() -> u64 {
4 * 1024 }
pub fn chunk_plan_for_network(
object_size_bytes: u64,
bandwidth_mbps: u64,
latency_ms: u64,
) -> ChunkPlan {
let base_plan = Self::chunk_plan(object_size_bytes);
let latency_factor = (latency_ms as f64 / 50.0).clamp(0.5, 4.0); let bandwidth_factor = (bandwidth_mbps as f64 / 100.0).clamp(0.1, 10.0);
let size_multiplier = (latency_factor * (2.0 / bandwidth_factor)).clamp(0.5, 4.0);
let adjusted_target = (base_plan.target_chunk_size as f64 * size_multiplier) as u64;
let adjusted_min = (base_plan.min_chunk_size as f64 * size_multiplier.min(2.0)) as u64;
let adjusted_max = (base_plan.max_chunk_size as f64 * size_multiplier) as u64;
ChunkPlan {
strategy: base_plan.strategy,
target_chunk_size: adjusted_target.min(Self::max_chunk_size()),
min_chunk_size: adjusted_min.max(Self::absolute_min_chunk_size()),
max_chunk_size: adjusted_max.min(Self::max_chunk_size()),
cdc_params: None,
}
}
pub fn estimate_transfer_time(
object_size_bytes: u64,
chunk_plan: &ChunkPlan,
bandwidth_mbps: u64,
latency_ms: u64,
) -> std::time::Duration {
let safe_target = chunk_plan.target_chunk_size.max(1);
let num_chunks = object_size_bytes.saturating_add(safe_target - 1) / safe_target;
let transfer_time_ms =
(object_size_bytes as f64 * 8.0) / (bandwidth_mbps.max(1) as f64 * 1000.0);
let latency_overhead_ms = num_chunks as f64 * latency_ms as f64;
let total_ms = transfer_time_ms + latency_overhead_ms;
std::time::Duration::from_millis(total_ms as u64)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn chunk_sizes_scale_with_object_size() {
let (target, min, max) = BulkFileProfile::compute_chunk_sizes(500_000);
assert_eq!(target, 64 * 1024);
assert_eq!(min, 16 * 1024);
assert_eq!(max, 128 * 1024);
let (target, min, max) = BulkFileProfile::compute_chunk_sizes(2_000_000_000);
assert_eq!(target, 16 * 1024 * 1024);
assert_eq!(min, 4 * 1024 * 1024);
assert_eq!(max, 16 * 1024 * 1024);
}
#[test]
fn chunk_plan_for_small_file() {
let plan = BulkFileProfile::chunk_plan(100_000);
assert_eq!(plan.strategy, ChunkStrategy::FixedSize);
assert!(plan.target_chunk_size >= plan.min_chunk_size);
assert!(plan.target_chunk_size <= plan.max_chunk_size);
assert!(plan.cdc_params.is_none());
}
#[test]
fn chunking_respects_size_constraints() {
let data = vec![0u8; 1_000_000]; let boundaries = BulkFileProfile::compute_boundaries(&data).unwrap();
for boundary in &boundaries {
assert!(boundary.size_bytes >= BulkFileProfile::absolute_min_chunk_size());
assert!(boundary.size_bytes <= BulkFileProfile::max_chunk_size());
assert!(matches!(boundary.strategy, ChunkStrategy::FixedSize));
}
let total_size: u64 = boundaries.iter().map(|b| b.size_bytes).sum();
assert_eq!(total_size, data.len() as u64);
}
#[test]
fn chunks_avoid_tiny_remainders() {
let chunk_size = 64 * 1024;
let data_size = chunk_size + 1000; let data = vec![0u8; data_size];
let boundaries = BulkFileProfile::compute_boundaries(&data).unwrap();
assert!(boundaries.len() <= 2, "Too many chunks for small remainder");
for boundary in &boundaries {
assert!(
boundary.size_bytes >= 1000,
"Chunk too small: {}",
boundary.size_bytes
);
}
}
#[test]
fn throughput_tier_classification() {
let tier = BulkFileProfile::determine_throughput_tier(64 * 1024, 10 * 1024 * 1024);
assert_eq!(tier, ThroughputTier::Tail);
let tier = BulkFileProfile::determine_throughput_tier(8 * 1024 * 1024, 10 * 1024 * 1024);
assert_eq!(tier, ThroughputTier::Bulk);
let tier = BulkFileProfile::determine_throughput_tier(1024 * 1024, 10 * 1024 * 1024);
assert_eq!(tier, ThroughputTier::Standard);
}
#[test]
fn network_adaptation_works() {
let base_plan = BulkFileProfile::chunk_plan(10 * 1024 * 1024);
let high_latency_plan = BulkFileProfile::chunk_plan_for_network(
10 * 1024 * 1024,
100, 200, );
assert!(high_latency_plan.target_chunk_size > base_plan.target_chunk_size);
let low_bandwidth_plan = BulkFileProfile::chunk_plan_for_network(
10 * 1024 * 1024,
10, 50, );
assert!(low_bandwidth_plan.target_chunk_size > base_plan.target_chunk_size);
}
#[test]
fn boundary_validation_catches_errors() {
let invalid_boundary = ChunkBoundary {
index: 0,
byte_offset: 0,
size_bytes: 1024,
content_hash: [1; 32],
strategy: ChunkStrategy::ContentDefined, metadata: Some(ChunkMetadata::BulkFile {
throughput_tier: ThroughputTier::Standard,
}),
};
let result = BulkFileProfile::validate_boundaries(&[invalid_boundary]);
assert!(result.is_err());
let too_small_boundary = ChunkBoundary {
index: 0,
byte_offset: 0,
size_bytes: 1024, content_hash: [1; 32],
strategy: ChunkStrategy::FixedSize,
metadata: Some(ChunkMetadata::BulkFile {
throughput_tier: ThroughputTier::Standard,
}),
};
let result = BulkFileProfile::validate_boundaries(&[too_small_boundary]);
assert!(result.is_err());
}
#[test]
fn transfer_time_estimation() {
let plan = BulkFileProfile::chunk_plan(10 * 1024 * 1024);
let duration = BulkFileProfile::estimate_transfer_time(
10 * 1024 * 1024, &plan,
100, 50, );
assert!(duration < std::time::Duration::from_secs(10));
assert!(duration > std::time::Duration::from_millis(100));
}
#[test]
fn profile_properties() {
assert!(BulkFileProfile::supports_incremental_chunking());
assert_eq!(BulkFileProfile::min_chunking_threshold(), 256 * 1024);
assert_eq!(BulkFileProfile::max_chunk_size(), 16 * 1024 * 1024);
}
#[test]
fn empty_data_handling() {
let boundaries = BulkFileProfile::compute_boundaries(&[]).unwrap();
assert!(boundaries.is_empty());
}
#[test]
fn single_chunk_data() {
let data = vec![0u8; 32 * 1024]; let boundaries = BulkFileProfile::compute_boundaries(&data).unwrap();
assert_eq!(boundaries.len(), 1);
assert_eq!(boundaries[0].size_bytes, data.len() as u64);
assert_eq!(boundaries[0].byte_offset, 0);
assert_eq!(boundaries[0].index, 0);
}
}