use crate::domain::errors::DistributionError;
use crate::domain::types::{DistributionPattern, ManyToManyMode, Payload};
pub const fn validate_cover_count(
pattern: &DistributionPattern,
cover_count: usize,
) -> Result<(), DistributionError> {
let needed = minimum_covers(pattern);
if cover_count < needed {
return Err(DistributionError::InsufficientCovers {
needed,
got: cover_count,
});
}
Ok(())
}
#[must_use]
pub const fn minimum_covers(pattern: &DistributionPattern) -> usize {
match pattern {
DistributionPattern::OneToOne | DistributionPattern::ManyToOne => 1,
DistributionPattern::OneToMany {
data_shards,
parity_shards,
} => {
(*data_shards as usize).strict_add(*parity_shards as usize)
}
DistributionPattern::ManyToMany { .. } => 2,
}
}
#[must_use]
pub fn assign_one_to_many(shard_count: usize, cover_count: usize) -> Vec<(usize, usize)> {
(0..shard_count).map(|i| (i, i % cover_count)).collect()
}
#[must_use]
pub fn assign_many_to_many(
mode: ManyToManyMode,
shard_count: usize,
cover_count: usize,
seed: u64,
) -> Vec<Vec<usize>> {
match mode {
ManyToManyMode::Replicate => {
let all_covers: Vec<usize> = (0..cover_count).collect();
(0..shard_count).map(|_| all_covers.clone()).collect()
}
ManyToManyMode::Stripe => {
(0..shard_count).map(|i| vec![i % cover_count]).collect()
}
ManyToManyMode::Diagonal => {
(0..shard_count)
.map(|i| {
let primary = i % cover_count;
let secondary = (i.strict_add(1)) % cover_count;
if primary == secondary {
vec![primary]
} else {
vec![primary, secondary]
}
})
.collect()
}
ManyToManyMode::Random => {
let mut state = seed;
(0..shard_count)
.map(|_| {
state = state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1_442_695_040_888_963_407);
let idx = (state >> 33) as usize % cover_count;
vec![idx]
})
.collect()
}
}
}
#[must_use]
pub fn pack_many_payloads(payloads: &[Payload]) -> Vec<u8> {
let mut buf = Vec::new();
#[expect(
clippy::cast_possible_truncation,
reason = "payload count bounded well below u32::MAX"
)]
let count = payloads.len() as u32;
buf.extend_from_slice(&count.to_le_bytes());
for p in payloads {
#[expect(
clippy::cast_possible_truncation,
reason = "individual payload size bounded below u32::MAX"
)]
let len = p.len() as u32;
buf.extend_from_slice(&len.to_le_bytes());
buf.extend_from_slice(p.as_bytes());
}
buf
}
pub fn unpack_many_payloads(data: &[u8]) -> Result<Vec<Payload>, DistributionError> {
let header = data.get(..4).ok_or(DistributionError::InsufficientCovers {
needed: 4,
got: data.len(),
})?;
let count = u32::from_le_bytes(<[u8; 4]>::try_from(header).map_err(|_| {
DistributionError::InsufficientCovers {
needed: 4,
got: data.len(),
}
})?) as usize;
let mut offset: usize = 4;
let mut payloads = Vec::with_capacity(count);
for _ in 0..count {
let len_slice = data.get(offset..offset.strict_add(4)).ok_or_else(|| {
DistributionError::InsufficientCovers {
needed: offset.strict_add(4),
got: data.len(),
}
})?;
let len = u32::from_le_bytes(<[u8; 4]>::try_from(len_slice).map_err(|_| {
DistributionError::InsufficientCovers {
needed: offset.strict_add(4),
got: data.len(),
}
})?) as usize;
offset = offset.strict_add(4);
let payload_slice = data.get(offset..offset.strict_add(len)).ok_or_else(|| {
DistributionError::InsufficientCovers {
needed: offset.strict_add(len),
got: data.len(),
}
})?;
payloads.push(Payload::from_bytes(payload_slice.to_vec()));
offset = offset.strict_add(len);
}
Ok(payloads)
}
#[cfg(test)]
mod tests {
use super::*;
type TestResult = Result<(), Box<dyn std::error::Error>>;
#[test]
fn validate_cover_count_one_to_one_needs_one() {
let pattern = DistributionPattern::OneToOne;
assert!(validate_cover_count(&pattern, 1).is_ok());
assert!(validate_cover_count(&pattern, 0).is_err());
}
#[test]
fn validate_cover_count_one_to_many() {
let pattern = DistributionPattern::OneToMany {
data_shards: 5,
parity_shards: 3,
};
assert!(validate_cover_count(&pattern, 8).is_ok());
assert!(validate_cover_count(&pattern, 7).is_err());
}
#[test]
fn validate_cover_count_many_to_one() {
let pattern = DistributionPattern::ManyToOne;
assert!(validate_cover_count(&pattern, 1).is_ok());
}
#[test]
fn validate_cover_count_many_to_many() {
let pattern = DistributionPattern::ManyToMany {
mode: ManyToManyMode::Replicate,
};
assert!(validate_cover_count(&pattern, 2).is_ok());
assert!(validate_cover_count(&pattern, 1).is_err());
}
#[test]
fn assign_one_to_many_round_robin() {
let assignments = assign_one_to_many(6, 3);
assert_eq!(
assignments,
vec![(0, 0), (1, 1), (2, 2), (3, 0), (4, 1), (5, 2)]
);
}
#[test]
fn assign_many_to_many_replicate() {
let assignments = assign_many_to_many(ManyToManyMode::Replicate, 2, 3, 0);
assert_eq!(assignments, vec![vec![0, 1, 2], vec![0, 1, 2]]);
}
#[test]
fn assign_many_to_many_stripe() {
let assignments = assign_many_to_many(ManyToManyMode::Stripe, 4, 3, 0);
assert_eq!(assignments, vec![vec![0], vec![1], vec![2], vec![0]]);
}
#[test]
fn assign_many_to_many_diagonal() {
let assignments = assign_many_to_many(ManyToManyMode::Diagonal, 3, 3, 0);
assert_eq!(assignments, vec![vec![0, 1], vec![1, 2], vec![2, 0]]);
}
#[test]
fn assign_many_to_many_random_deterministic() {
let a1 = assign_many_to_many(ManyToManyMode::Random, 5, 3, 42);
let a2 = assign_many_to_many(ManyToManyMode::Random, 5, 3, 42);
assert_eq!(a1, a2);
}
#[test]
fn pack_unpack_round_trip() -> TestResult {
let payloads = vec![
Payload::from_bytes(b"hello".to_vec()),
Payload::from_bytes(b"world".to_vec()),
Payload::from_bytes(b"!".to_vec()),
];
let packed = pack_many_payloads(&payloads);
let unpacked = unpack_many_payloads(&packed)?;
assert_eq!(unpacked.len(), 3);
assert_eq!(
unpacked.first().ok_or("index out of bounds")?.as_bytes(),
b"hello"
);
assert_eq!(
unpacked.get(1).ok_or("index out of bounds")?.as_bytes(),
b"world"
);
assert_eq!(
unpacked.get(2).ok_or("index out of bounds")?.as_bytes(),
b"!"
);
Ok(())
}
#[test]
fn unpack_empty_buffer_errors() {
assert!(unpack_many_payloads(&[]).is_err());
}
#[test]
fn unpack_truncated_buffer_errors() {
let payloads = vec![Payload::from_bytes(b"test".to_vec())];
let mut packed = pack_many_payloads(&payloads);
packed.truncate(packed.len().strict_sub(2)); assert!(unpack_many_payloads(&packed).is_err());
}
#[test]
fn minimum_covers_values() {
assert_eq!(minimum_covers(&DistributionPattern::OneToOne), 1);
assert_eq!(minimum_covers(&DistributionPattern::ManyToOne), 1);
assert_eq!(
minimum_covers(&DistributionPattern::OneToMany {
data_shards: 10,
parity_shards: 5,
}),
15
);
assert_eq!(
minimum_covers(&DistributionPattern::ManyToMany {
mode: ManyToManyMode::Stripe,
}),
2
);
}
#[test]
fn assign_many_to_many_random_different_seeds_differ() {
let a1 = assign_many_to_many(ManyToManyMode::Random, 10, 5, 1);
let a2 = assign_many_to_many(ManyToManyMode::Random, 10, 5, 2);
assert_ne!(
a1, a2,
"different seeds should produce different assignments"
);
}
#[test]
fn pack_empty_payloads() -> TestResult {
let packed = pack_many_payloads(&[]);
let unpacked = unpack_many_payloads(&packed)?;
assert!(unpacked.is_empty());
Ok(())
}
#[test]
fn assign_one_to_many_single_cover() {
let assignments = assign_one_to_many(3, 1);
assert_eq!(assignments, vec![(0, 0), (1, 0), (2, 0)]);
}
#[test]
fn diagonal_single_cover_no_secondary() {
let assignments = assign_many_to_many(ManyToManyMode::Diagonal, 2, 1, 0);
assert_eq!(assignments, vec![vec![0], vec![0]]);
}
#[test]
fn unpack_truncated_at_length_prefix() {
let data: Vec<u8> = vec![1, 0, 0, 0];
assert!(unpack_many_payloads(&data).is_err());
}
}