1use fsqlite_error::{FrankenError, Result};
9use tracing::{debug, info};
10
11const BEAD_ID: &str = "bd-1hi.6";
12
13pub const K_MAX: u32 = 56_403;
15
16pub const SBN_MAX: u8 = 255;
18
19pub const MAX_PARTITIONABLE_PAGES: u64 = K_MAX as u64 * 256;
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub struct SourceBlock {
25 pub index: u8,
27 pub start_page: u32,
29 pub num_pages: u32,
31}
32
33pub fn partition_source_blocks(total_pages: u32) -> Result<Vec<SourceBlock>> {
42 if total_pages == 0 {
43 debug!(bead_id = BEAD_ID, "empty database, no source blocks");
44 return Ok(Vec::new());
45 }
46
47 let p = u64::from(total_pages);
48
49 if p > MAX_PARTITIONABLE_PAGES {
50 return Err(FrankenError::OutOfRange {
51 what: "total_pages".to_owned(),
52 value: total_pages.to_string(),
53 });
54 }
55
56 if total_pages <= K_MAX {
57 info!(
58 bead_id = BEAD_ID,
59 total_pages, "single source block covers entire database"
60 );
61 return Ok(vec![SourceBlock {
62 index: 0,
63 start_page: 1,
64 num_pages: total_pages,
65 }]);
66 }
67
68 let z = total_pages.div_ceil(K_MAX);
72 let k_l = total_pages.div_ceil(z);
73 let k_s = total_pages / z;
74 let z_l = total_pages - k_s * z;
75 let z_s = z - z_l;
76
77 info!(
78 bead_id = BEAD_ID,
79 total_pages, z, k_l, k_s, z_l, z_s, "partitioned database into multiple source blocks"
80 );
81
82 let mut blocks = Vec::with_capacity(usize::try_from(z).unwrap_or(256));
83 let mut offset: u32 = 1; for i in 0..z_l {
86 let idx = u8::try_from(i).expect("SBN checked by MAX_PARTITIONABLE_PAGES guard");
87 blocks.push(SourceBlock {
88 index: idx,
89 start_page: offset,
90 num_pages: k_l,
91 });
92 offset = offset
93 .checked_add(k_l)
94 .expect("offset overflow checked by MAX_PARTITIONABLE_PAGES guard");
95 }
96
97 for i in 0..z_s {
98 let idx = u8::try_from(z_l + i).expect("SBN checked by MAX_PARTITIONABLE_PAGES guard");
99 blocks.push(SourceBlock {
100 index: idx,
101 start_page: offset,
102 num_pages: k_s,
103 });
104 offset = offset
105 .checked_add(k_s)
106 .expect("offset overflow checked by MAX_PARTITIONABLE_PAGES guard");
107 }
108
109 debug_assert_eq!(
110 offset,
111 total_pages + 1,
112 "bead_id={BEAD_ID} partition coverage mismatch"
113 );
114
115 Ok(blocks)
116}
117
118#[cfg(test)]
119mod tests {
120 use super::*;
121
122 const TEST_BEAD_ID: &str = "bd-1hi.6";
123
124 #[test]
129 fn test_partition_empty_db() {
130 let blocks = partition_source_blocks(0).expect("empty db should succeed");
131 assert!(
132 blocks.is_empty(),
133 "bead_id={TEST_BEAD_ID} case=empty_db_no_blocks"
134 );
135 }
136
137 #[test]
138 fn test_partition_single_page() {
139 let blocks = partition_source_blocks(1).expect("single page should succeed");
140 assert_eq!(
141 blocks.len(),
142 1,
143 "bead_id={TEST_BEAD_ID} case=single_page_one_block"
144 );
145 assert_eq!(blocks[0].index, 0);
146 assert_eq!(blocks[0].start_page, 1);
147 assert_eq!(blocks[0].num_pages, 1);
148 }
149
150 #[test]
151 fn test_partition_small_db() {
152 let blocks = partition_source_blocks(64).expect("64 pages should succeed");
154 assert_eq!(
155 blocks.len(),
156 1,
157 "bead_id={TEST_BEAD_ID} case=small_db_single_block"
158 );
159 assert_eq!(blocks[0].index, 0);
160 assert_eq!(blocks[0].start_page, 1);
161 assert_eq!(blocks[0].num_pages, 64);
162 }
163
164 #[test]
165 fn test_partition_small_db_100() {
166 let blocks = partition_source_blocks(100).expect("100 pages should succeed");
168 assert_eq!(
169 blocks.len(),
170 1,
171 "bead_id={TEST_BEAD_ID} case=single_block_p100"
172 );
173 assert_eq!(blocks[0].start_page, 1);
174 assert_eq!(blocks[0].num_pages, 100);
175 }
176
177 #[test]
178 fn test_partition_boundary_exactly_k_max() {
179 let blocks = partition_source_blocks(K_MAX).expect("exactly K_MAX should succeed");
181 assert_eq!(
182 blocks.len(),
183 1,
184 "bead_id={TEST_BEAD_ID} case=boundary_exactly_k_max"
185 );
186 assert_eq!(blocks[0].index, 0);
187 assert_eq!(blocks[0].start_page, 1);
188 assert_eq!(blocks[0].num_pages, K_MAX);
189 }
190
191 #[test]
192 fn test_partition_two_blocks() {
193 let p = K_MAX + 1;
195 let blocks = partition_source_blocks(p).expect("K_MAX+1 should succeed");
196 assert_eq!(blocks.len(), 2, "bead_id={TEST_BEAD_ID} case=two_blocks");
197
198 assert_eq!(blocks[0].index, 0);
205 assert_eq!(blocks[0].start_page, 1);
206 assert_eq!(blocks[1].index, 1);
207
208 let total: u32 = blocks.iter().map(|b| b.num_pages).sum();
210 assert_eq!(total, p, "bead_id={TEST_BEAD_ID} case=two_blocks_coverage");
211
212 for block in &blocks {
214 assert!(
215 block.num_pages <= K_MAX,
216 "bead_id={TEST_BEAD_ID} case=two_blocks_k_max block={}",
217 block.index
218 );
219 }
220
221 assert_eq!(
223 blocks[1].start_page,
224 blocks[0].start_page + blocks[0].num_pages
225 );
226 }
227
228 #[test]
229 fn test_partition_large_db_worked_example() {
230 let p = 262_144_u32;
232 let blocks = partition_source_blocks(p).expect("large db should succeed");
233
234 assert_eq!(
236 blocks.len(),
237 5,
238 "bead_id={TEST_BEAD_ID} case=large_db_5_blocks"
239 );
240
241 let expected_k_l = 52_429_u32;
246 let expected_k_s = 52_428_u32;
247
248 for block in &blocks[..4] {
249 assert_eq!(
250 block.num_pages, expected_k_l,
251 "bead_id={TEST_BEAD_ID} case=large_db_k_l_block block={}",
252 block.index
253 );
254 }
255 assert_eq!(
256 blocks[4].num_pages, expected_k_s,
257 "bead_id={TEST_BEAD_ID} case=large_db_k_s_block"
258 );
259
260 assert_eq!(blocks[0].start_page, 1);
262 assert_eq!(blocks[0].start_page + blocks[0].num_pages - 1, 52_429);
263 assert_eq!(blocks[1].start_page, 52_430);
264 assert_eq!(blocks[1].start_page + blocks[1].num_pages - 1, 104_858);
265 assert_eq!(blocks[2].start_page, 104_859);
266 assert_eq!(blocks[2].start_page + blocks[2].num_pages - 1, 157_287);
267 assert_eq!(blocks[3].start_page, 157_288);
268 assert_eq!(blocks[3].start_page + blocks[3].num_pages - 1, 209_716);
269 assert_eq!(blocks[4].start_page, 209_717);
270 assert_eq!(blocks[4].start_page + blocks[4].num_pages - 1, 262_144);
271
272 let total: u32 = blocks.iter().map(|b| b.num_pages).sum();
274 assert_eq!(
275 total, p,
276 "bead_id={TEST_BEAD_ID} case=large_db_total_coverage"
277 );
278
279 for (i, block) in blocks.iter().enumerate() {
281 assert_eq!(
282 block.index,
283 u8::try_from(i).unwrap(),
284 "bead_id={TEST_BEAD_ID} case=large_db_sequential_indices"
285 );
286 }
287 }
288
289 #[test]
290 fn test_partition_large_db_10000() {
291 let blocks = partition_source_blocks(10_000).expect("10k pages should succeed");
293 assert_eq!(
295 blocks.len(),
296 1,
297 "bead_id={TEST_BEAD_ID} case=large_db_10k_single_block"
298 );
299 assert_eq!(blocks[0].num_pages, 10_000);
300 }
301
302 #[test]
303 fn test_partition_uneven() {
304 let p = K_MAX * 3 + 7; let blocks = partition_source_blocks(p).expect("uneven split should succeed");
307
308 let total: u32 = blocks.iter().map(|b| b.num_pages).sum();
313 assert_eq!(
314 total, p,
315 "bead_id={TEST_BEAD_ID} case=uneven_total_coverage"
316 );
317
318 for block in &blocks {
320 assert!(
321 block.num_pages <= K_MAX,
322 "bead_id={TEST_BEAD_ID} case=uneven_k_max block={}",
323 block.index
324 );
325 }
326
327 let max_k = blocks.iter().map(|b| b.num_pages).max().unwrap();
329 let min_k = blocks.iter().map(|b| b.num_pages).min().unwrap();
330 assert!(
331 max_k - min_k <= 1,
332 "bead_id={TEST_BEAD_ID} case=uneven_balanced max_k={max_k} min_k={min_k}"
333 );
334
335 for window in blocks.windows(2) {
337 assert_eq!(
338 window[1].start_page,
339 window[0].start_page + window[0].num_pages,
340 "bead_id={TEST_BEAD_ID} case=uneven_contiguous"
341 );
342 }
343 }
344
345 #[test]
346 fn test_partition_page1_special() {
347 for p in [1_u32, 64, K_MAX, K_MAX + 1, 262_144] {
349 let blocks = partition_source_blocks(p).expect("partition should succeed");
350 assert!(
351 !blocks.is_empty(),
352 "bead_id={TEST_BEAD_ID} case=page1_nonempty p={p}"
353 );
354 assert_eq!(
355 blocks[0].start_page, 1,
356 "bead_id={TEST_BEAD_ID} case=page1_in_first_block p={p}"
357 );
358 }
359 }
360
361 #[test]
362 fn test_partition_maximum_blocks() {
363 let p_u64 = u64::from(K_MAX) * 256;
365 assert!(
366 u32::try_from(p_u64).is_ok(),
367 "test precondition: fits in u32"
368 );
369 let p = u32::try_from(p_u64).unwrap();
370 let blocks = partition_source_blocks(p).expect("max blocks should succeed");
371 assert_eq!(
372 blocks.len(),
373 256,
374 "bead_id={TEST_BEAD_ID} case=max_blocks_256"
375 );
376
377 for block in &blocks {
379 assert_eq!(
380 block.num_pages, K_MAX,
381 "bead_id={TEST_BEAD_ID} case=max_blocks_each_k_max block={}",
382 block.index
383 );
384 }
385
386 assert_eq!(blocks[255].index, 255);
388
389 let total: u64 = blocks.iter().map(|b| u64::from(b.num_pages)).sum();
391 assert_eq!(
392 total, p_u64,
393 "bead_id={TEST_BEAD_ID} case=max_blocks_coverage"
394 );
395 }
396
397 #[test]
398 fn test_partition_overflow_too_many_pages() {
399 let p_u64 = u64::from(K_MAX) * 256 + 1;
401 if let Ok(p) = u32::try_from(p_u64) {
402 let result = partition_source_blocks(p);
403 assert!(
404 result.is_err(),
405 "bead_id={TEST_BEAD_ID} case=overflow_error p={p}"
406 );
407 }
408 }
410
411 #[test]
416 fn prop_partition_coverage() {
417 for p in [
419 1_u32,
420 2,
421 63,
422 64,
423 100,
424 1000,
425 K_MAX - 1,
426 K_MAX,
427 K_MAX + 1,
428 K_MAX * 2,
429 262_144,
430 K_MAX * 100,
431 ] {
432 let blocks = partition_source_blocks(p).expect("partition should succeed");
433 let total: u32 = blocks.iter().map(|b| b.num_pages).sum();
434 assert_eq!(total, p, "bead_id={TEST_BEAD_ID} case=prop_coverage p={p}");
435 }
436 }
437
438 #[test]
439 fn prop_partition_deterministic() {
440 for p in [1_u32, K_MAX, K_MAX + 1, 262_144] {
442 let a = partition_source_blocks(p).expect("first call");
443 let b = partition_source_blocks(p).expect("second call");
444 assert_eq!(a, b, "bead_id={TEST_BEAD_ID} case=prop_deterministic p={p}");
445 }
446 }
447
448 #[test]
449 fn prop_partition_k_max_bounds() {
450 for p in [K_MAX + 1, K_MAX * 2, K_MAX * 3 + 7, 262_144, K_MAX * 100] {
452 let blocks = partition_source_blocks(p).expect("partition should succeed");
453 for block in &blocks {
454 assert!(
455 block.num_pages <= K_MAX,
456 "bead_id={TEST_BEAD_ID} case=prop_k_max_bound p={p} block={} num_pages={}",
457 block.index,
458 block.num_pages
459 );
460 }
461 }
462 }
463
464 #[test]
465 fn prop_partition_contiguous_non_overlapping() {
466 for p in [K_MAX + 1, K_MAX * 5, 262_144] {
468 let blocks = partition_source_blocks(p).expect("partition should succeed");
469 for window in blocks.windows(2) {
470 let end_prev = window[0].start_page + window[0].num_pages;
471 assert_eq!(
472 window[1].start_page, end_prev,
473 "bead_id={TEST_BEAD_ID} case=prop_contiguous p={p}"
474 );
475 }
476 assert_eq!(blocks[0].start_page, 1);
478 let last = blocks.last().unwrap();
480 assert_eq!(last.start_page + last.num_pages - 1, p);
481 }
482 }
483
484 #[test]
485 fn prop_partition_sequential_indices() {
486 for p in [K_MAX + 1, K_MAX * 3, 262_144] {
487 let blocks = partition_source_blocks(p).expect("partition should succeed");
488 for (i, block) in blocks.iter().enumerate() {
489 assert_eq!(
490 block.index,
491 u8::try_from(i).unwrap(),
492 "bead_id={TEST_BEAD_ID} case=prop_sequential_indices p={p}"
493 );
494 }
495 }
496 }
497
498 #[test]
503 fn test_bd_1hi_6_unit_compliance_gate() {
504 assert_eq!(K_MAX, 56_403, "RFC 6330 K_max constant");
506 assert_eq!(SBN_MAX, 255, "RFC 6330 SBN_max constant");
507 assert_eq!(
508 MAX_PARTITIONABLE_PAGES,
509 56_403 * 256,
510 "max partitionable pages"
511 );
512
513 let _ = partition_source_blocks(0);
515 let _ = partition_source_blocks(1);
516 let _ = partition_source_blocks(K_MAX);
517 }
518
519 #[test]
520 fn prop_bd_1hi_6_structure_compliance() {
521 let test_sizes = [
524 1_u32,
525 100,
526 K_MAX,
527 K_MAX + 1,
528 K_MAX * 2,
529 K_MAX * 100,
530 K_MAX * 256,
531 ];
532 for &p in &test_sizes {
533 let blocks = partition_source_blocks(p).expect("valid partition");
534 let total: u32 = blocks.iter().map(|b| b.num_pages).sum();
536 assert_eq!(total, p);
537 for b in &blocks {
539 assert!(b.num_pages <= K_MAX);
540 assert!(b.num_pages > 0);
541 }
542 for (i, b) in blocks.iter().enumerate() {
544 assert_eq!(b.index, u8::try_from(i).unwrap());
545 }
546 }
547 }
548}