Skip to main content

fsqlite_core/
source_block_partition.rs

1//! §3.1 RaptorQ Source Block Partitioning for Large Databases (bd-1hi.6).
2//!
3//! Databases larger than a single RaptorQ source block (K_max = 56,403
4//! source symbols) must be partitioned into multiple contiguous blocks.
5//! This module implements the RFC 6330 §4.4.1 partitioning algorithm
6//! adapted for page-level encoding.
7
8use fsqlite_error::{FrankenError, Result};
9use tracing::{debug, info};
10
11const BEAD_ID: &str = "bd-1hi.6";
12
13/// RFC 6330 maximum source symbols per source block.
14pub const K_MAX: u32 = 56_403;
15
16/// RFC 6330 bounds Source Block Number to 8 bits.
17pub const SBN_MAX: u8 = 255;
18
19/// Maximum total pages that can be partitioned (K_MAX * 256).
20pub const MAX_PARTITIONABLE_PAGES: u64 = K_MAX as u64 * 256;
21
22/// A contiguous range of database pages forming one RaptorQ source block.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub struct SourceBlock {
25    /// 0-based block index (Source Block Number). Fits in `u8` (max 255).
26    pub index: u8,
27    /// 1-based first page number in this block.
28    pub start_page: u32,
29    /// Number of source symbols (pages) in this block. Always <= `K_MAX`.
30    pub num_pages: u32,
31}
32
33/// Partition `total_pages` database pages into source blocks per RFC 6330 §4.4.1.
34///
35/// Pages are 1-based (page 1 is always in the first source block).
36///
37/// # Errors
38///
39/// Returns `FrankenError::OutOfRange` if `total_pages` exceeds `MAX_PARTITIONABLE_PAGES`
40/// (i.e. would require more than 256 source blocks).
41pub fn partition_source_blocks(total_pages: u32) -> Result<Vec<SourceBlock>> {
42    if total_pages == 0 {
43        debug!(bead_id = BEAD_ID, "empty database, no source blocks");
44        return Ok(Vec::new());
45    }
46
47    let p = u64::from(total_pages);
48
49    if p > MAX_PARTITIONABLE_PAGES {
50        return Err(FrankenError::OutOfRange {
51            what: "total_pages".to_owned(),
52            value: total_pages.to_string(),
53        });
54    }
55
56    if total_pages <= K_MAX {
57        info!(
58            bead_id = BEAD_ID,
59            total_pages, "single source block covers entire database"
60        );
61        return Ok(vec![SourceBlock {
62            index: 0,
63            start_page: 1,
64            num_pages: total_pages,
65        }]);
66    }
67
68    // Multiple source blocks needed.
69    // Partition P pages into Z blocks as evenly as possible.
70    // RFC 6330 §4.4.1: Z_L blocks of K_L symbols, Z_S blocks of K_S symbols.
71    let z = total_pages.div_ceil(K_MAX);
72    let k_l = total_pages.div_ceil(z);
73    let k_s = total_pages / z;
74    let z_l = total_pages - k_s * z;
75    let z_s = z - z_l;
76
77    info!(
78        bead_id = BEAD_ID,
79        total_pages, z, k_l, k_s, z_l, z_s, "partitioned database into multiple source blocks"
80    );
81
82    let mut blocks = Vec::with_capacity(usize::try_from(z).unwrap_or(256));
83    let mut offset: u32 = 1; // 1-based page numbers
84
85    for i in 0..z_l {
86        let idx = u8::try_from(i).expect("SBN checked by MAX_PARTITIONABLE_PAGES guard");
87        blocks.push(SourceBlock {
88            index: idx,
89            start_page: offset,
90            num_pages: k_l,
91        });
92        offset = offset
93            .checked_add(k_l)
94            .expect("offset overflow checked by MAX_PARTITIONABLE_PAGES guard");
95    }
96
97    for i in 0..z_s {
98        let idx = u8::try_from(z_l + i).expect("SBN checked by MAX_PARTITIONABLE_PAGES guard");
99        blocks.push(SourceBlock {
100            index: idx,
101            start_page: offset,
102            num_pages: k_s,
103        });
104        offset = offset
105            .checked_add(k_s)
106            .expect("offset overflow checked by MAX_PARTITIONABLE_PAGES guard");
107    }
108
109    debug_assert_eq!(
110        offset,
111        total_pages + 1,
112        "bead_id={BEAD_ID} partition coverage mismatch"
113    );
114
115    Ok(blocks)
116}
117
118#[cfg(test)]
119mod tests {
120    use super::*;
121
122    const TEST_BEAD_ID: &str = "bd-1hi.6";
123
124    // -----------------------------------------------------------------------
125    // Unit tests from spec comment: §3.1 Source Block Partitioning
126    // -----------------------------------------------------------------------
127
128    #[test]
129    fn test_partition_empty_db() {
130        let blocks = partition_source_blocks(0).expect("empty db should succeed");
131        assert!(
132            blocks.is_empty(),
133            "bead_id={TEST_BEAD_ID} case=empty_db_no_blocks"
134        );
135    }
136
137    #[test]
138    fn test_partition_single_page() {
139        let blocks = partition_source_blocks(1).expect("single page should succeed");
140        assert_eq!(
141            blocks.len(),
142            1,
143            "bead_id={TEST_BEAD_ID} case=single_page_one_block"
144        );
145        assert_eq!(blocks[0].index, 0);
146        assert_eq!(blocks[0].start_page, 1);
147        assert_eq!(blocks[0].num_pages, 1);
148    }
149
150    #[test]
151    fn test_partition_small_db() {
152        // test_partition_small_db: 64-page DB → 1 source block. K=64.
153        let blocks = partition_source_blocks(64).expect("64 pages should succeed");
154        assert_eq!(
155            blocks.len(),
156            1,
157            "bead_id={TEST_BEAD_ID} case=small_db_single_block"
158        );
159        assert_eq!(blocks[0].index, 0);
160        assert_eq!(blocks[0].start_page, 1);
161        assert_eq!(blocks[0].num_pages, 64);
162    }
163
164    #[test]
165    fn test_partition_small_db_100() {
166        // From bead spec: Single block: P=100
167        let blocks = partition_source_blocks(100).expect("100 pages should succeed");
168        assert_eq!(
169            blocks.len(),
170            1,
171            "bead_id={TEST_BEAD_ID} case=single_block_p100"
172        );
173        assert_eq!(blocks[0].start_page, 1);
174        assert_eq!(blocks[0].num_pages, 100);
175    }
176
177    #[test]
178    fn test_partition_boundary_exactly_k_max() {
179        // P=56403 (exactly K_max), verify single block
180        let blocks = partition_source_blocks(K_MAX).expect("exactly K_MAX should succeed");
181        assert_eq!(
182            blocks.len(),
183            1,
184            "bead_id={TEST_BEAD_ID} case=boundary_exactly_k_max"
185        );
186        assert_eq!(blocks[0].index, 0);
187        assert_eq!(blocks[0].start_page, 1);
188        assert_eq!(blocks[0].num_pages, K_MAX);
189    }
190
191    #[test]
192    fn test_partition_two_blocks() {
193        // P=56404 (just over K_max), verify 2 blocks with correct sizes
194        let p = K_MAX + 1;
195        let blocks = partition_source_blocks(p).expect("K_MAX+1 should succeed");
196        assert_eq!(blocks.len(), 2, "bead_id={TEST_BEAD_ID} case=two_blocks");
197
198        // Z = ceil(56404/56403) = 2
199        // K_L = ceil(56404/2) = 28202
200        // K_S = floor(56404/2) = 28202
201        // Z_L = 56404 - 28202*2 = 0
202        // Z_S = 2 - 0 = 2
203        // Both blocks have 28202 pages
204        assert_eq!(blocks[0].index, 0);
205        assert_eq!(blocks[0].start_page, 1);
206        assert_eq!(blocks[1].index, 1);
207
208        // Total coverage
209        let total: u32 = blocks.iter().map(|b| b.num_pages).sum();
210        assert_eq!(total, p, "bead_id={TEST_BEAD_ID} case=two_blocks_coverage");
211
212        // Both blocks within K_MAX
213        for block in &blocks {
214            assert!(
215                block.num_pages <= K_MAX,
216                "bead_id={TEST_BEAD_ID} case=two_blocks_k_max block={}",
217                block.index
218            );
219        }
220
221        // Contiguous
222        assert_eq!(
223            blocks[1].start_page,
224            blocks[0].start_page + blocks[0].num_pages
225        );
226    }
227
228    #[test]
229    fn test_partition_large_db_worked_example() {
230        // From spec: 1GB database with 4096-byte pages = 262,144 pages
231        let p = 262_144_u32;
232        let blocks = partition_source_blocks(p).expect("large db should succeed");
233
234        // Z = ceil(262144/56403) = 5 source blocks
235        assert_eq!(
236            blocks.len(),
237            5,
238            "bead_id={TEST_BEAD_ID} case=large_db_5_blocks"
239        );
240
241        // K_L = ceil(262144/5) = 52429
242        // K_S = floor(262144/5) = 52428
243        // Z_L = 262144 - 52428*5 = 262144 - 262140 = 4 blocks of 52429 pages
244        // Z_S = 5 - 4 = 1 block of 52428 pages
245        let expected_k_l = 52_429_u32;
246        let expected_k_s = 52_428_u32;
247
248        for block in &blocks[..4] {
249            assert_eq!(
250                block.num_pages, expected_k_l,
251                "bead_id={TEST_BEAD_ID} case=large_db_k_l_block block={}",
252                block.index
253            );
254        }
255        assert_eq!(
256            blocks[4].num_pages, expected_k_s,
257            "bead_id={TEST_BEAD_ID} case=large_db_k_s_block"
258        );
259
260        // Verify exact block boundaries from spec
261        assert_eq!(blocks[0].start_page, 1);
262        assert_eq!(blocks[0].start_page + blocks[0].num_pages - 1, 52_429);
263        assert_eq!(blocks[1].start_page, 52_430);
264        assert_eq!(blocks[1].start_page + blocks[1].num_pages - 1, 104_858);
265        assert_eq!(blocks[2].start_page, 104_859);
266        assert_eq!(blocks[2].start_page + blocks[2].num_pages - 1, 157_287);
267        assert_eq!(blocks[3].start_page, 157_288);
268        assert_eq!(blocks[3].start_page + blocks[3].num_pages - 1, 209_716);
269        assert_eq!(blocks[4].start_page, 209_717);
270        assert_eq!(blocks[4].start_page + blocks[4].num_pages - 1, 262_144);
271
272        // Total coverage
273        let total: u32 = blocks.iter().map(|b| b.num_pages).sum();
274        assert_eq!(
275            total, p,
276            "bead_id={TEST_BEAD_ID} case=large_db_total_coverage"
277        );
278
279        // Sequential indices
280        for (i, block) in blocks.iter().enumerate() {
281            assert_eq!(
282                block.index,
283                u8::try_from(i).unwrap(),
284                "bead_id={TEST_BEAD_ID} case=large_db_sequential_indices"
285            );
286        }
287    }
288
289    #[test]
290    fn test_partition_large_db_10000() {
291        // test_partition_large_db: 10,000-page DB → multiple source blocks
292        let blocks = partition_source_blocks(10_000).expect("10k pages should succeed");
293        // 10,000 < K_MAX (56,403), so this is a single block
294        assert_eq!(
295            blocks.len(),
296            1,
297            "bead_id={TEST_BEAD_ID} case=large_db_10k_single_block"
298        );
299        assert_eq!(blocks[0].num_pages, 10_000);
300    }
301
302    #[test]
303    fn test_partition_uneven() {
304        // test_partition_uneven: DB size not evenly divisible
305        let p = K_MAX * 3 + 7; // 169,216 pages
306        let blocks = partition_source_blocks(p).expect("uneven split should succeed");
307
308        // Z = ceil(169216/56403) = 4 (not 3, since 169216 > 56403*3 = 169209)
309        // Actually: 56403 * 3 = 169209, and 169216 > 169209, so Z = ceil(169216/56403) = 4
310
311        // Total coverage
312        let total: u32 = blocks.iter().map(|b| b.num_pages).sum();
313        assert_eq!(
314            total, p,
315            "bead_id={TEST_BEAD_ID} case=uneven_total_coverage"
316        );
317
318        // All blocks within K_MAX
319        for block in &blocks {
320            assert!(
321                block.num_pages <= K_MAX,
322                "bead_id={TEST_BEAD_ID} case=uneven_k_max block={}",
323                block.index
324            );
325        }
326
327        // Sizes differ by at most 1
328        let max_k = blocks.iter().map(|b| b.num_pages).max().unwrap();
329        let min_k = blocks.iter().map(|b| b.num_pages).min().unwrap();
330        assert!(
331            max_k - min_k <= 1,
332            "bead_id={TEST_BEAD_ID} case=uneven_balanced max_k={max_k} min_k={min_k}"
333        );
334
335        // Contiguous
336        for window in blocks.windows(2) {
337            assert_eq!(
338                window[1].start_page,
339                window[0].start_page + window[0].num_pages,
340                "bead_id={TEST_BEAD_ID} case=uneven_contiguous"
341            );
342        }
343    }
344
345    #[test]
346    fn test_partition_page1_special() {
347        // Page 1 (header) always in first source block
348        for p in [1_u32, 64, K_MAX, K_MAX + 1, 262_144] {
349            let blocks = partition_source_blocks(p).expect("partition should succeed");
350            assert!(
351                !blocks.is_empty(),
352                "bead_id={TEST_BEAD_ID} case=page1_nonempty p={p}"
353            );
354            assert_eq!(
355                blocks[0].start_page, 1,
356                "bead_id={TEST_BEAD_ID} case=page1_in_first_block p={p}"
357            );
358        }
359    }
360
361    #[test]
362    fn test_partition_maximum_blocks() {
363        // P = K_MAX * 256, verify 256 blocks (SBN boundary)
364        let p_u64 = u64::from(K_MAX) * 256;
365        assert!(
366            u32::try_from(p_u64).is_ok(),
367            "test precondition: fits in u32"
368        );
369        let p = u32::try_from(p_u64).unwrap();
370        let blocks = partition_source_blocks(p).expect("max blocks should succeed");
371        assert_eq!(
372            blocks.len(),
373            256,
374            "bead_id={TEST_BEAD_ID} case=max_blocks_256"
375        );
376
377        // Each block has exactly K_MAX pages
378        for block in &blocks {
379            assert_eq!(
380                block.num_pages, K_MAX,
381                "bead_id={TEST_BEAD_ID} case=max_blocks_each_k_max block={}",
382                block.index
383            );
384        }
385
386        // Last block index is 255
387        assert_eq!(blocks[255].index, 255);
388
389        // Total coverage
390        let total: u64 = blocks.iter().map(|b| u64::from(b.num_pages)).sum();
391        assert_eq!(
392            total, p_u64,
393            "bead_id={TEST_BEAD_ID} case=max_blocks_coverage"
394        );
395    }
396
397    #[test]
398    fn test_partition_overflow_too_many_pages() {
399        // P > K_MAX * 256 must error
400        let p_u64 = u64::from(K_MAX) * 256 + 1;
401        if let Ok(p) = u32::try_from(p_u64) {
402            let result = partition_source_blocks(p);
403            assert!(
404                result.is_err(),
405                "bead_id={TEST_BEAD_ID} case=overflow_error p={p}"
406            );
407        }
408        // If p_u64 doesn't fit in u32, the overflow is caught at the type level
409    }
410
411    // -----------------------------------------------------------------------
412    // Property tests
413    // -----------------------------------------------------------------------
414
415    #[test]
416    fn prop_partition_coverage() {
417        // For a range of DB sizes, total pages across blocks == P
418        for p in [
419            1_u32,
420            2,
421            63,
422            64,
423            100,
424            1000,
425            K_MAX - 1,
426            K_MAX,
427            K_MAX + 1,
428            K_MAX * 2,
429            262_144,
430            K_MAX * 100,
431        ] {
432            let blocks = partition_source_blocks(p).expect("partition should succeed");
433            let total: u32 = blocks.iter().map(|b| b.num_pages).sum();
434            assert_eq!(total, p, "bead_id={TEST_BEAD_ID} case=prop_coverage p={p}");
435        }
436    }
437
438    #[test]
439    fn prop_partition_deterministic() {
440        // Same input always produces same output
441        for p in [1_u32, K_MAX, K_MAX + 1, 262_144] {
442            let a = partition_source_blocks(p).expect("first call");
443            let b = partition_source_blocks(p).expect("second call");
444            assert_eq!(a, b, "bead_id={TEST_BEAD_ID} case=prop_deterministic p={p}");
445        }
446    }
447
448    #[test]
449    fn prop_partition_k_max_bounds() {
450        // Every block has num_pages <= K_MAX
451        for p in [K_MAX + 1, K_MAX * 2, K_MAX * 3 + 7, 262_144, K_MAX * 100] {
452            let blocks = partition_source_blocks(p).expect("partition should succeed");
453            for block in &blocks {
454                assert!(
455                    block.num_pages <= K_MAX,
456                    "bead_id={TEST_BEAD_ID} case=prop_k_max_bound p={p} block={} num_pages={}",
457                    block.index,
458                    block.num_pages
459                );
460            }
461        }
462    }
463
464    #[test]
465    fn prop_partition_contiguous_non_overlapping() {
466        // Blocks are contiguous and non-overlapping
467        for p in [K_MAX + 1, K_MAX * 5, 262_144] {
468            let blocks = partition_source_blocks(p).expect("partition should succeed");
469            for window in blocks.windows(2) {
470                let end_prev = window[0].start_page + window[0].num_pages;
471                assert_eq!(
472                    window[1].start_page, end_prev,
473                    "bead_id={TEST_BEAD_ID} case=prop_contiguous p={p}"
474                );
475            }
476            // First block starts at page 1
477            assert_eq!(blocks[0].start_page, 1);
478            // Last block ends at page P
479            let last = blocks.last().unwrap();
480            assert_eq!(last.start_page + last.num_pages - 1, p);
481        }
482    }
483
484    #[test]
485    fn prop_partition_sequential_indices() {
486        for p in [K_MAX + 1, K_MAX * 3, 262_144] {
487            let blocks = partition_source_blocks(p).expect("partition should succeed");
488            for (i, block) in blocks.iter().enumerate() {
489                assert_eq!(
490                    block.index,
491                    u8::try_from(i).unwrap(),
492                    "bead_id={TEST_BEAD_ID} case=prop_sequential_indices p={p}"
493                );
494            }
495        }
496    }
497
498    // -----------------------------------------------------------------------
499    // Compliance tests
500    // -----------------------------------------------------------------------
501
502    #[test]
503    fn test_bd_1hi_6_unit_compliance_gate() {
504        // Verify section-specific unit validation hooks are wired.
505        assert_eq!(K_MAX, 56_403, "RFC 6330 K_max constant");
506        assert_eq!(SBN_MAX, 255, "RFC 6330 SBN_max constant");
507        assert_eq!(
508            MAX_PARTITIONABLE_PAGES,
509            56_403 * 256,
510            "max partitionable pages"
511        );
512
513        // Verify the function exists and is callable
514        let _ = partition_source_blocks(0);
515        let _ = partition_source_blocks(1);
516        let _ = partition_source_blocks(K_MAX);
517    }
518
519    #[test]
520    fn prop_bd_1hi_6_structure_compliance() {
521        // Property check that required structure blocks are present.
522        // For any valid P, blocks form a valid partition.
523        let test_sizes = [
524            1_u32,
525            100,
526            K_MAX,
527            K_MAX + 1,
528            K_MAX * 2,
529            K_MAX * 100,
530            K_MAX * 256,
531        ];
532        for &p in &test_sizes {
533            let blocks = partition_source_blocks(p).expect("valid partition");
534            // Coverage
535            let total: u32 = blocks.iter().map(|b| b.num_pages).sum();
536            assert_eq!(total, p);
537            // Bounds
538            for b in &blocks {
539                assert!(b.num_pages <= K_MAX);
540                assert!(b.num_pages > 0);
541            }
542            // Sequential
543            for (i, b) in blocks.iter().enumerate() {
544                assert_eq!(b.index, u8::try_from(i).unwrap());
545            }
546        }
547    }
548}