use bytes::{Bytes, BytesMut};
use fastcdc::v2020::{FastCDC, Normalization};
use super::blob_tree::ChunkingStrategy;
use super::error::BlobError;
pub const DATAFORTS_BLOB_CDC_SUPPORTED: &str = "dataforts:blob-cdc-supported";
#[derive(Default)]
pub enum CdcSupportProbe {
#[default]
AlwaysSupported,
ForceFixed,
Dynamic(Box<dyn Fn() -> bool + Send + Sync>),
}
impl CdcSupportProbe {
pub fn check(&self) -> bool {
match self {
CdcSupportProbe::AlwaysSupported => true,
CdcSupportProbe::ForceFixed => false,
CdcSupportProbe::Dynamic(f) => f(),
}
}
}
impl std::fmt::Debug for CdcSupportProbe {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CdcSupportProbe::AlwaysSupported => f.write_str("CdcSupportProbe::AlwaysSupported"),
CdcSupportProbe::ForceFixed => f.write_str("CdcSupportProbe::ForceFixed"),
CdcSupportProbe::Dynamic(_) => f.write_str("CdcSupportProbe::Dynamic(..)"),
}
}
}
pub fn cdc_downgrade(chunking: ChunkingStrategy, probe: &CdcSupportProbe) -> ChunkingStrategy {
match chunking {
ChunkingStrategy::Cdc { .. } if !probe.check() => ChunkingStrategy::Fixed {
size: super::blob_ref::BLOB_CHUNK_SIZE_BYTES as u32,
},
other => other,
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct CdcParams {
pub min: u32,
pub avg: u32,
pub max: u32,
}
pub const PRODUCTION_CDC_PARAMS: CdcParams = CdcParams {
min: 1024 * 1024,
avg: 4 * 1024 * 1024,
max: 16 * 1024 * 1024,
};
impl CdcParams {
pub fn from_strategy(strategy: ChunkingStrategy) -> Option<Self> {
match strategy {
ChunkingStrategy::Cdc { min, avg, max } => Some(Self { min, avg, max }),
ChunkingStrategy::Fixed { .. } => None,
}
}
pub fn validate(&self) -> Result<(), BlobError> {
const MIN_MIN: u32 = 64;
const MIN_MAX: u32 = 1_048_576;
const AVG_MIN: u32 = 256;
const AVG_MAX: u32 = 4_194_304;
const MAX_MIN: u32 = 1024;
const MAX_MAX: u32 = 16_777_216;
if self.min < MIN_MIN || self.min > MIN_MAX {
return Err(BlobError::Backend(format!(
"CDC params: min {} outside [{}, {}]",
self.min, MIN_MIN, MIN_MAX
)));
}
if self.avg < AVG_MIN || self.avg > AVG_MAX {
return Err(BlobError::Backend(format!(
"CDC params: avg {} outside [{}, {}]",
self.avg, AVG_MIN, AVG_MAX
)));
}
if self.max < MAX_MIN || self.max > MAX_MAX {
return Err(BlobError::Backend(format!(
"CDC params: max {} outside [{}, {}]",
self.max, MAX_MIN, MAX_MAX
)));
}
if self.min >= self.avg || self.avg >= self.max {
return Err(BlobError::Backend(format!(
"CDC params: must hold min < avg < max; got min={} avg={} max={}",
self.min, self.avg, self.max
)));
}
Ok(())
}
}
pub struct CdcStreamChunker {
buffer: BytesMut,
params: CdcParams,
last_unsuccessful_scan_len: Option<usize>,
}
impl CdcStreamChunker {
pub fn new(params: CdcParams) -> Result<Self, BlobError> {
params.validate()?;
Ok(Self {
buffer: BytesMut::with_capacity(params.max as usize),
params,
last_unsuccessful_scan_len: None,
})
}
pub fn extend(&mut self, bytes: &[u8]) {
if bytes.is_empty() {
return;
}
self.buffer.extend_from_slice(bytes);
if let Some(prev) = self.last_unsuccessful_scan_len {
if self.buffer.len() >= prev.saturating_add(self.params.min as usize) {
self.last_unsuccessful_scan_len = None;
}
}
}
pub fn try_next_chunk(&mut self) -> Option<Bytes> {
if self.buffer.is_empty() {
return None;
}
if let Some(prev) = self.last_unsuccessful_scan_len {
if self.buffer.len() < prev.saturating_add(self.params.min as usize)
&& self.buffer.len() < self.params.max as usize
{
return None;
}
}
let chunker = FastCDC::with_level(
&self.buffer,
self.params.min as usize,
self.params.avg as usize,
self.params.max as usize,
Normalization::Level2,
);
let chunk = chunker.into_iter().next()?;
let is_max_cut = chunk.length == self.params.max as usize;
let is_premature_eof = chunk.length == self.buffer.len() && !is_max_cut;
if is_premature_eof {
self.last_unsuccessful_scan_len = Some(self.buffer.len());
return None;
}
let payload = self.buffer.split_to(chunk.length).freeze();
self.last_unsuccessful_scan_len = None;
Some(payload)
}
pub fn finalize(&mut self) -> Vec<Bytes> {
let mut out = Vec::new();
while !self.buffer.is_empty() {
let chunker = FastCDC::with_level(
&self.buffer,
self.params.min as usize,
self.params.avg as usize,
self.params.max as usize,
Normalization::Level2,
);
let chunk = match chunker.into_iter().next() {
Some(c) => c,
None => break,
};
let payload = self.buffer.split_to(chunk.length).freeze();
out.push(payload);
}
out
}
pub fn buffered_bytes(&self) -> usize {
self.buffer.len()
}
pub fn params(&self) -> CdcParams {
self.params
}
}
#[cfg(test)]
mod tests {
use super::*;
const TEST_PARAMS: CdcParams = CdcParams {
min: 256,
avg: 1024,
max: 4096,
};
fn deterministic_bytes(seed: u64, len: usize) -> Vec<u8> {
let mut state = seed;
(0..len)
.map(|_| {
state = state
.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407);
(state >> 33) as u8
})
.collect()
}
#[test]
fn single_extend_then_drain_round_trips() {
let payload = deterministic_bytes(1, 64 * 1024);
let mut chunker = CdcStreamChunker::new(TEST_PARAMS).expect("TEST_PARAMS valid");
chunker.extend(&payload);
let mut chunks = Vec::new();
while let Some(c) = chunker.try_next_chunk() {
chunks.push(c);
}
chunks.extend(chunker.finalize());
let reconstructed: Vec<u8> = chunks.iter().flatten().copied().collect();
assert_eq!(reconstructed, payload);
}
#[test]
fn byte_at_a_time_terminates_in_bounded_time_under_pathological_input() {
let params = CdcParams {
min: 4 * 1024,
avg: 16 * 1024,
max: 64 * 1024,
};
let payload = vec![0u8; 256 * 1024];
let mut chunker = CdcStreamChunker::new(params).expect("test params valid");
let mut emitted_total = 0usize;
let start = std::time::Instant::now();
for b in &payload {
chunker.extend(std::slice::from_ref(b));
while let Some(c) = chunker.try_next_chunk() {
emitted_total += c.len();
}
}
let final_chunks = chunker.finalize();
for c in &final_chunks {
emitted_total += c.len();
}
let elapsed = start.elapsed();
assert_eq!(emitted_total, payload.len());
assert!(
elapsed < std::time::Duration::from_secs(2),
"byte-at-a-time CDC streaming took {:?} — pre-fix would have taken \
tens of seconds at these parameters; the no-cut-scan cache should \
keep this well under 100ms in release and under 2s in debug",
elapsed,
);
}
#[test]
fn byte_at_a_time_matches_single_extend() {
let payload = deterministic_bytes(2, 16 * 1024);
let mut bulk = CdcStreamChunker::new(TEST_PARAMS).expect("TEST_PARAMS valid");
bulk.extend(&payload);
let mut bulk_chunks = Vec::new();
while let Some(c) = bulk.try_next_chunk() {
bulk_chunks.push(c);
}
bulk_chunks.extend(bulk.finalize());
let mut drip = CdcStreamChunker::new(TEST_PARAMS).expect("TEST_PARAMS valid");
let mut drip_chunks = Vec::new();
for b in &payload {
drip.extend(std::slice::from_ref(b));
while let Some(c) = drip.try_next_chunk() {
drip_chunks.push(c);
}
}
drip_chunks.extend(drip.finalize());
assert_eq!(drip_chunks, bulk_chunks);
}
#[test]
fn determinism_across_runs() {
let payload = deterministic_bytes(3, 32 * 1024);
let chunk_run = |params: CdcParams, data: &[u8]| -> Vec<Bytes> {
let mut c = CdcStreamChunker::new(params).expect("test params valid");
c.extend(data);
let mut out = Vec::new();
while let Some(ch) = c.try_next_chunk() {
out.push(ch);
}
out.extend(c.finalize());
out
};
let a = chunk_run(TEST_PARAMS, &payload);
let b = chunk_run(TEST_PARAMS, &payload);
assert_eq!(a, b);
}
#[test]
fn cross_version_determinism_pinned_against_known_input() {
let payload = deterministic_bytes(42, 256 * 1024);
let mut chunker =
CdcStreamChunker::new(PRODUCTION_CDC_PARAMS).expect("PRODUCTION_CDC_PARAMS valid");
chunker.extend(&payload);
assert!(
chunker.try_next_chunk().is_none(),
"input under params.min must defer cut to finalize"
);
let final_chunks = chunker.finalize();
assert_eq!(
final_chunks.len(),
1,
"256 KiB input under PRODUCTION_CDC_PARAMS.min should produce exactly 1 chunk"
);
assert_eq!(
final_chunks[0].len(),
256 * 1024,
"lone chunk must cover the entire input"
);
let chunk_hash: [u8; 32] = blake3::hash(&final_chunks[0]).into();
let expected_hash: [u8; 32] = blake3::hash(&payload).into();
assert_eq!(
chunk_hash, expected_hash,
"single-chunk EOF output must be byte-identical to the input"
);
}
#[test]
fn one_byte_edit_dedup_majority() {
let mut payload = deterministic_bytes(4, 128 * 1024);
let original = payload.clone();
payload[64 * 1024] ^= 0xFF;
let chunk_set = |data: &[u8]| -> std::collections::HashSet<Bytes> {
let mut c = CdcStreamChunker::new(TEST_PARAMS).expect("TEST_PARAMS valid");
c.extend(data);
let mut out = std::collections::HashSet::new();
while let Some(ch) = c.try_next_chunk() {
out.insert(ch);
}
for ch in c.finalize() {
out.insert(ch);
}
out
};
let orig_chunks = chunk_set(&original);
let edited_chunks = chunk_set(&payload);
let intersection: usize = orig_chunks.intersection(&edited_chunks).count();
let union: usize = orig_chunks.union(&edited_chunks).count();
let dedup_ratio = intersection as f64 / union as f64;
assert!(
dedup_ratio > 0.75,
"single-byte edit dedup ratio {} < 0.75; CDC boundaries \
may be cascading instead of localising",
dedup_ratio
);
}
#[test]
fn every_chunk_under_max() {
let payload = deterministic_bytes(5, 64 * 1024);
let mut chunker = CdcStreamChunker::new(TEST_PARAMS).expect("TEST_PARAMS valid");
chunker.extend(&payload);
while let Some(c) = chunker.try_next_chunk() {
assert!(
c.len() <= TEST_PARAMS.max as usize,
"chunk len {} exceeds max {}",
c.len(),
TEST_PARAMS.max
);
}
for c in chunker.finalize() {
assert!(c.len() <= TEST_PARAMS.max as usize);
}
}
#[test]
fn all_zero_input_forces_max_cuts() {
let payload = vec![0u8; 32 * 1024];
let mut chunker = CdcStreamChunker::new(TEST_PARAMS).expect("TEST_PARAMS valid");
chunker.extend(&payload);
let mut chunks = Vec::new();
while let Some(c) = chunker.try_next_chunk() {
chunks.push(c);
}
chunks.extend(chunker.finalize());
for (i, c) in chunks.iter().enumerate() {
if i + 1 < chunks.len() {
assert_eq!(
c.len(),
TEST_PARAMS.max as usize,
"non-final chunk at index {} should be max-sized",
i
);
}
}
}
#[test]
fn new_rejects_invalid_params() {
let bad = CdcParams {
min: 1024,
avg: 1024,
max: 4096,
};
let res = CdcStreamChunker::new(bad);
assert!(res.is_err(), "CdcStreamChunker::new must reject min == avg",);
let bad = CdcParams {
min: 1024,
avg: 1_000_000_000,
max: 2_000_000_000,
};
assert!(CdcStreamChunker::new(bad).is_err());
assert!(CdcStreamChunker::new(PRODUCTION_CDC_PARAMS).is_ok());
}
#[test]
fn validate_rejects_bad_params() {
assert!(CdcParams {
min: 1,
avg: 1024,
max: 4096
}
.validate()
.is_err());
assert!(CdcParams {
min: 1024,
avg: 5_000_000,
max: 16_000_000
}
.validate()
.is_err());
assert!(CdcParams {
min: 4096,
avg: 1024,
max: 8192
}
.validate()
.is_err());
assert!(CdcParams {
min: 1024,
avg: 1024,
max: 4096
}
.validate()
.is_err());
assert!(CdcParams {
min: 1024,
avg: 4096,
max: 4096
}
.validate()
.is_err());
assert!(PRODUCTION_CDC_PARAMS.validate().is_ok());
}
#[test]
fn cdc_support_probe_static_variants() {
assert!(CdcSupportProbe::AlwaysSupported.check());
assert!(!CdcSupportProbe::ForceFixed.check());
assert!(CdcSupportProbe::default().check()); }
#[test]
fn cdc_support_probe_dynamic_consults_closure() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
let flag = Arc::new(AtomicBool::new(false));
let f = flag.clone();
let probe = CdcSupportProbe::Dynamic(Box::new(move || f.load(Ordering::Relaxed)));
assert!(!probe.check());
flag.store(true, Ordering::Relaxed);
assert!(probe.check());
}
#[test]
fn cdc_downgrade_substitutes_only_for_cdc_on_reject() {
let cdc = ChunkingStrategy::Cdc {
min: 1024 * 1024,
avg: 4 * 1024 * 1024,
max: 16 * 1024 * 1024,
};
let fixed = ChunkingStrategy::Fixed {
size: 4 * 1024 * 1024,
};
assert_eq!(cdc_downgrade(cdc, &CdcSupportProbe::AlwaysSupported), cdc);
assert_eq!(
cdc_downgrade(fixed, &CdcSupportProbe::AlwaysSupported),
fixed
);
let downgraded = cdc_downgrade(cdc, &CdcSupportProbe::ForceFixed);
assert_eq!(
downgraded,
ChunkingStrategy::Fixed {
size: super::super::blob_ref::BLOB_CHUNK_SIZE_BYTES as u32
}
);
assert_eq!(cdc_downgrade(fixed, &CdcSupportProbe::ForceFixed), fixed);
}
#[test]
fn buffer_bound_holds() {
let payload = deterministic_bytes(6, 100 * 1024);
let mut chunker = CdcStreamChunker::new(TEST_PARAMS).expect("TEST_PARAMS valid");
for slice in payload.chunks(128) {
chunker.extend(slice);
assert!(
chunker.buffered_bytes() <= TEST_PARAMS.max as usize + 128,
"buffer grew to {} bytes, expected ≤ max + slice_size",
chunker.buffered_bytes()
);
while chunker.try_next_chunk().is_some() {}
}
}
}