Skip to main content

shadowforge_lib/domain/distribution/
mod.rs

1//! Four distribution patterns: 1:1, 1:N, N:1, N:M.
2//!
3//! Pure domain logic — no I/O, no file system, no async runtime.
4//! The adapter layer is responsible for parallel execution (rayon).
5
6use crate::domain::errors::DistributionError;
7use crate::domain::types::{DistributionPattern, ManyToManyMode, Payload};
8
9/// Validate that the cover count satisfies the distribution pattern.
10///
11/// # Errors
12/// Returns [`DistributionError::InsufficientCovers`] if too few covers.
13pub const fn validate_cover_count(
14    pattern: &DistributionPattern,
15    cover_count: usize,
16) -> Result<(), DistributionError> {
17    let needed = minimum_covers(pattern);
18    if cover_count < needed {
19        return Err(DistributionError::InsufficientCovers {
20            needed,
21            got: cover_count,
22        });
23    }
24    Ok(())
25}
26
27/// Minimum number of covers required for a given pattern.
28#[must_use]
29pub const fn minimum_covers(pattern: &DistributionPattern) -> usize {
30    match pattern {
31        DistributionPattern::OneToOne | DistributionPattern::ManyToOne => 1,
32        DistributionPattern::OneToMany {
33            data_shards,
34            parity_shards,
35        } => {
36            // Need at least data_shards + parity_shards covers
37            (*data_shards as usize).strict_add(*parity_shards as usize)
38        }
39        DistributionPattern::ManyToMany { .. } => 2,
40    }
41}
42
43/// Assign shards to covers for 1:N distribution.
44///
45/// Returns a `Vec<(shard_index, cover_index)>` mapping.
46#[must_use]
47pub fn assign_one_to_many(shard_count: usize, cover_count: usize) -> Vec<(usize, usize)> {
48    (0..shard_count).map(|i| (i, i % cover_count)).collect()
49}
50
51/// Assign shards to covers for M:N (many-to-many) distribution.
52///
53/// Returns a `Vec<Vec<usize>>` where outer index = shard and inner = cover indices.
54#[must_use]
55pub fn assign_many_to_many(
56    mode: ManyToManyMode,
57    shard_count: usize,
58    cover_count: usize,
59    seed: u64,
60) -> Vec<Vec<usize>> {
61    match mode {
62        ManyToManyMode::Replicate => {
63            // Every shard goes to every cover
64            let all_covers: Vec<usize> = (0..cover_count).collect();
65            (0..shard_count).map(|_| all_covers.clone()).collect()
66        }
67        ManyToManyMode::Stripe => {
68            // Round-robin stripe across covers
69            (0..shard_count).map(|i| vec![i % cover_count]).collect()
70        }
71        ManyToManyMode::Diagonal => {
72            // Diagonal assignment across the matrix
73            (0..shard_count)
74                .map(|i| {
75                    let primary = i % cover_count;
76                    let secondary = (i.strict_add(1)) % cover_count;
77                    if primary == secondary {
78                        vec![primary]
79                    } else {
80                        vec![primary, secondary]
81                    }
82                })
83                .collect()
84        }
85        ManyToManyMode::Random => {
86            // Deterministic pseudo-random assignment using LCG
87            let mut state = seed;
88            (0..shard_count)
89                .map(|_| {
90                    state = state
91                        .wrapping_mul(6_364_136_223_846_793_005)
92                        .wrapping_add(1_442_695_040_888_963_407);
93                    let idx = (state >> 33) as usize % cover_count;
94                    vec![idx]
95                })
96                .collect()
97        }
98    }
99}
100
101/// Build a concatenated multi-payload with length-prefix manifest for N:1.
102///
103/// Format: `[count:4][len_0:4][data_0][len_1:4][data_1]...`
104#[must_use]
105pub fn pack_many_payloads(payloads: &[Payload]) -> Vec<u8> {
106    let mut buf = Vec::new();
107    #[expect(
108        clippy::cast_possible_truncation,
109        reason = "payload count bounded well below u32::MAX"
110    )]
111    let count = payloads.len() as u32;
112    buf.extend_from_slice(&count.to_le_bytes());
113    for p in payloads {
114        #[expect(
115            clippy::cast_possible_truncation,
116            reason = "individual payload size bounded below u32::MAX"
117        )]
118        let len = p.len() as u32;
119        buf.extend_from_slice(&len.to_le_bytes());
120        buf.extend_from_slice(p.as_bytes());
121    }
122    buf
123}
124
125/// Unpack a multi-payload buffer produced by [`pack_many_payloads`].
126///
127/// # Errors
128/// Returns [`DistributionError::InsufficientCovers`] (repurposed) if the
129/// buffer is truncated.
130pub fn unpack_many_payloads(data: &[u8]) -> Result<Vec<Payload>, DistributionError> {
131    let header = data.get(..4).ok_or(DistributionError::InsufficientCovers {
132        needed: 4,
133        got: data.len(),
134    })?;
135    let count = u32::from_le_bytes(<[u8; 4]>::try_from(header).map_err(|_| {
136        DistributionError::InsufficientCovers {
137            needed: 4,
138            got: data.len(),
139        }
140    })?) as usize;
141    let mut offset: usize = 4;
142    let mut payloads = Vec::with_capacity(count);
143    for _ in 0..count {
144        let len_slice = data.get(offset..offset.strict_add(4)).ok_or_else(|| {
145            DistributionError::InsufficientCovers {
146                needed: offset.strict_add(4),
147                got: data.len(),
148            }
149        })?;
150        let len = u32::from_le_bytes(<[u8; 4]>::try_from(len_slice).map_err(|_| {
151            DistributionError::InsufficientCovers {
152                needed: offset.strict_add(4),
153                got: data.len(),
154            }
155        })?) as usize;
156        offset = offset.strict_add(4);
157        let payload_slice = data.get(offset..offset.strict_add(len)).ok_or_else(|| {
158            DistributionError::InsufficientCovers {
159                needed: offset.strict_add(len),
160                got: data.len(),
161            }
162        })?;
163        payloads.push(Payload::from_bytes(payload_slice.to_vec()));
164        offset = offset.strict_add(len);
165    }
166    Ok(payloads)
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172
173    type TestResult = Result<(), Box<dyn std::error::Error>>;
174
175    #[test]
176    fn validate_cover_count_one_to_one_needs_one() {
177        let pattern = DistributionPattern::OneToOne;
178        assert!(validate_cover_count(&pattern, 1).is_ok());
179        assert!(validate_cover_count(&pattern, 0).is_err());
180    }
181
182    #[test]
183    fn validate_cover_count_one_to_many() {
184        let pattern = DistributionPattern::OneToMany {
185            data_shards: 5,
186            parity_shards: 3,
187        };
188        assert!(validate_cover_count(&pattern, 8).is_ok());
189        assert!(validate_cover_count(&pattern, 7).is_err());
190    }
191
192    #[test]
193    fn validate_cover_count_many_to_one() {
194        let pattern = DistributionPattern::ManyToOne;
195        assert!(validate_cover_count(&pattern, 1).is_ok());
196    }
197
198    #[test]
199    fn validate_cover_count_many_to_many() {
200        let pattern = DistributionPattern::ManyToMany {
201            mode: ManyToManyMode::Replicate,
202        };
203        assert!(validate_cover_count(&pattern, 2).is_ok());
204        assert!(validate_cover_count(&pattern, 1).is_err());
205    }
206
207    #[test]
208    fn assign_one_to_many_round_robin() {
209        let assignments = assign_one_to_many(6, 3);
210        assert_eq!(
211            assignments,
212            vec![(0, 0), (1, 1), (2, 2), (3, 0), (4, 1), (5, 2)]
213        );
214    }
215
216    #[test]
217    fn assign_many_to_many_replicate() {
218        let assignments = assign_many_to_many(ManyToManyMode::Replicate, 2, 3, 0);
219        assert_eq!(assignments, vec![vec![0, 1, 2], vec![0, 1, 2]]);
220    }
221
222    #[test]
223    fn assign_many_to_many_stripe() {
224        let assignments = assign_many_to_many(ManyToManyMode::Stripe, 4, 3, 0);
225        assert_eq!(assignments, vec![vec![0], vec![1], vec![2], vec![0]]);
226    }
227
228    #[test]
229    fn assign_many_to_many_diagonal() {
230        let assignments = assign_many_to_many(ManyToManyMode::Diagonal, 3, 3, 0);
231        // shard 0 → covers [0, 1], shard 1 → covers [1, 2], shard 2 → covers [2, 0]
232        assert_eq!(assignments, vec![vec![0, 1], vec![1, 2], vec![2, 0]]);
233    }
234
235    #[test]
236    fn assign_many_to_many_random_deterministic() {
237        let a1 = assign_many_to_many(ManyToManyMode::Random, 5, 3, 42);
238        let a2 = assign_many_to_many(ManyToManyMode::Random, 5, 3, 42);
239        assert_eq!(a1, a2);
240    }
241
242    #[test]
243    fn pack_unpack_round_trip() -> TestResult {
244        let payloads = vec![
245            Payload::from_bytes(b"hello".to_vec()),
246            Payload::from_bytes(b"world".to_vec()),
247            Payload::from_bytes(b"!".to_vec()),
248        ];
249        let packed = pack_many_payloads(&payloads);
250        let unpacked = unpack_many_payloads(&packed)?;
251        assert_eq!(unpacked.len(), 3);
252        assert_eq!(
253            unpacked.first().ok_or("index out of bounds")?.as_bytes(),
254            b"hello"
255        );
256        assert_eq!(
257            unpacked.get(1).ok_or("index out of bounds")?.as_bytes(),
258            b"world"
259        );
260        assert_eq!(
261            unpacked.get(2).ok_or("index out of bounds")?.as_bytes(),
262            b"!"
263        );
264        Ok(())
265    }
266
267    #[test]
268    fn unpack_empty_buffer_errors() {
269        assert!(unpack_many_payloads(&[]).is_err());
270    }
271
272    #[test]
273    fn unpack_truncated_buffer_errors() {
274        let payloads = vec![Payload::from_bytes(b"test".to_vec())];
275        let mut packed = pack_many_payloads(&payloads);
276        packed.truncate(packed.len().strict_sub(2)); // corrupt
277        assert!(unpack_many_payloads(&packed).is_err());
278    }
279
280    #[test]
281    fn minimum_covers_values() {
282        assert_eq!(minimum_covers(&DistributionPattern::OneToOne), 1);
283        assert_eq!(minimum_covers(&DistributionPattern::ManyToOne), 1);
284        assert_eq!(
285            minimum_covers(&DistributionPattern::OneToMany {
286                data_shards: 10,
287                parity_shards: 5,
288            }),
289            15
290        );
291        assert_eq!(
292            minimum_covers(&DistributionPattern::ManyToMany {
293                mode: ManyToManyMode::Stripe,
294            }),
295            2
296        );
297    }
298
299    #[test]
300    fn assign_many_to_many_random_different_seeds_differ() {
301        let a1 = assign_many_to_many(ManyToManyMode::Random, 10, 5, 1);
302        let a2 = assign_many_to_many(ManyToManyMode::Random, 10, 5, 2);
303        assert_ne!(
304            a1, a2,
305            "different seeds should produce different assignments"
306        );
307    }
308
309    #[test]
310    fn pack_empty_payloads() -> TestResult {
311        let packed = pack_many_payloads(&[]);
312        let unpacked = unpack_many_payloads(&packed)?;
313        assert!(unpacked.is_empty());
314        Ok(())
315    }
316
317    #[test]
318    fn assign_one_to_many_single_cover() {
319        let assignments = assign_one_to_many(3, 1);
320        // All shards go to the single cover
321        assert_eq!(assignments, vec![(0, 0), (1, 0), (2, 0)]);
322    }
323
324    #[test]
325    fn diagonal_single_cover_no_secondary() {
326        let assignments = assign_many_to_many(ManyToManyMode::Diagonal, 2, 1, 0);
327        // With 1 cover, primary == secondary, so just one entry
328        assert_eq!(assignments, vec![vec![0], vec![0]]);
329    }
330
331    #[test]
332    fn unpack_truncated_at_length_prefix() {
333        // 4-byte count says "1 payload", but no payload length follows
334        let data: Vec<u8> = vec![1, 0, 0, 0];
335        assert!(unpack_many_payloads(&data).is_err());
336    }
337}