use fsqlite_error::{FrankenError, Result};
use tracing::{debug, info};
const BEAD_ID: &str = "bd-1hi.6";
pub const K_MAX: u32 = 56_403;
pub const SBN_MAX: u8 = 255;
pub const MAX_PARTITIONABLE_PAGES: u64 = K_MAX as u64 * 256;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SourceBlock {
pub index: u8,
pub start_page: u32,
pub num_pages: u32,
}
pub fn partition_source_blocks(total_pages: u32) -> Result<Vec<SourceBlock>> {
if total_pages == 0 {
debug!(bead_id = BEAD_ID, "empty database, no source blocks");
return Ok(Vec::new());
}
let p = u64::from(total_pages);
if p > MAX_PARTITIONABLE_PAGES {
return Err(FrankenError::OutOfRange {
what: "total_pages".to_owned(),
value: total_pages.to_string(),
});
}
if total_pages <= K_MAX {
info!(
bead_id = BEAD_ID,
total_pages, "single source block covers entire database"
);
return Ok(vec![SourceBlock {
index: 0,
start_page: 1,
num_pages: total_pages,
}]);
}
let z = total_pages.div_ceil(K_MAX);
let k_l = total_pages.div_ceil(z);
let k_s = total_pages / z;
let z_l = total_pages - k_s * z;
let z_s = z - z_l;
info!(
bead_id = BEAD_ID,
total_pages, z, k_l, k_s, z_l, z_s, "partitioned database into multiple source blocks"
);
let mut blocks = Vec::with_capacity(usize::try_from(z).unwrap_or(256));
let mut offset: u32 = 1;
for i in 0..z_l {
let idx = u8::try_from(i).expect("SBN checked by MAX_PARTITIONABLE_PAGES guard");
blocks.push(SourceBlock {
index: idx,
start_page: offset,
num_pages: k_l,
});
offset = offset
.checked_add(k_l)
.expect("offset overflow checked by MAX_PARTITIONABLE_PAGES guard");
}
for i in 0..z_s {
let idx = u8::try_from(z_l + i).expect("SBN checked by MAX_PARTITIONABLE_PAGES guard");
blocks.push(SourceBlock {
index: idx,
start_page: offset,
num_pages: k_s,
});
offset = offset
.checked_add(k_s)
.expect("offset overflow checked by MAX_PARTITIONABLE_PAGES guard");
}
debug_assert_eq!(
offset,
total_pages + 1,
"bead_id={BEAD_ID} partition coverage mismatch"
);
Ok(blocks)
}
#[cfg(test)]
mod tests {
use super::*;
const TEST_BEAD_ID: &str = "bd-1hi.6";
#[test]
fn test_partition_empty_db() {
let blocks = partition_source_blocks(0).expect("empty db should succeed");
assert!(
blocks.is_empty(),
"bead_id={TEST_BEAD_ID} case=empty_db_no_blocks"
);
}
#[test]
fn test_partition_single_page() {
let blocks = partition_source_blocks(1).expect("single page should succeed");
assert_eq!(
blocks.len(),
1,
"bead_id={TEST_BEAD_ID} case=single_page_one_block"
);
assert_eq!(blocks[0].index, 0);
assert_eq!(blocks[0].start_page, 1);
assert_eq!(blocks[0].num_pages, 1);
}
#[test]
fn test_partition_small_db() {
let blocks = partition_source_blocks(64).expect("64 pages should succeed");
assert_eq!(
blocks.len(),
1,
"bead_id={TEST_BEAD_ID} case=small_db_single_block"
);
assert_eq!(blocks[0].index, 0);
assert_eq!(blocks[0].start_page, 1);
assert_eq!(blocks[0].num_pages, 64);
}
#[test]
fn test_partition_small_db_100() {
let blocks = partition_source_blocks(100).expect("100 pages should succeed");
assert_eq!(
blocks.len(),
1,
"bead_id={TEST_BEAD_ID} case=single_block_p100"
);
assert_eq!(blocks[0].start_page, 1);
assert_eq!(blocks[0].num_pages, 100);
}
#[test]
fn test_partition_boundary_exactly_k_max() {
let blocks = partition_source_blocks(K_MAX).expect("exactly K_MAX should succeed");
assert_eq!(
blocks.len(),
1,
"bead_id={TEST_BEAD_ID} case=boundary_exactly_k_max"
);
assert_eq!(blocks[0].index, 0);
assert_eq!(blocks[0].start_page, 1);
assert_eq!(blocks[0].num_pages, K_MAX);
}
#[test]
fn test_partition_two_blocks() {
let p = K_MAX + 1;
let blocks = partition_source_blocks(p).expect("K_MAX+1 should succeed");
assert_eq!(blocks.len(), 2, "bead_id={TEST_BEAD_ID} case=two_blocks");
assert_eq!(blocks[0].index, 0);
assert_eq!(blocks[0].start_page, 1);
assert_eq!(blocks[1].index, 1);
let total: u32 = blocks.iter().map(|b| b.num_pages).sum();
assert_eq!(total, p, "bead_id={TEST_BEAD_ID} case=two_blocks_coverage");
for block in &blocks {
assert!(
block.num_pages <= K_MAX,
"bead_id={TEST_BEAD_ID} case=two_blocks_k_max block={}",
block.index
);
}
assert_eq!(
blocks[1].start_page,
blocks[0].start_page + blocks[0].num_pages
);
}
#[test]
fn test_partition_large_db_worked_example() {
let p = 262_144_u32;
let blocks = partition_source_blocks(p).expect("large db should succeed");
assert_eq!(
blocks.len(),
5,
"bead_id={TEST_BEAD_ID} case=large_db_5_blocks"
);
let expected_k_l = 52_429_u32;
let expected_k_s = 52_428_u32;
for block in &blocks[..4] {
assert_eq!(
block.num_pages, expected_k_l,
"bead_id={TEST_BEAD_ID} case=large_db_k_l_block block={}",
block.index
);
}
assert_eq!(
blocks[4].num_pages, expected_k_s,
"bead_id={TEST_BEAD_ID} case=large_db_k_s_block"
);
assert_eq!(blocks[0].start_page, 1);
assert_eq!(blocks[0].start_page + blocks[0].num_pages - 1, 52_429);
assert_eq!(blocks[1].start_page, 52_430);
assert_eq!(blocks[1].start_page + blocks[1].num_pages - 1, 104_858);
assert_eq!(blocks[2].start_page, 104_859);
assert_eq!(blocks[2].start_page + blocks[2].num_pages - 1, 157_287);
assert_eq!(blocks[3].start_page, 157_288);
assert_eq!(blocks[3].start_page + blocks[3].num_pages - 1, 209_716);
assert_eq!(blocks[4].start_page, 209_717);
assert_eq!(blocks[4].start_page + blocks[4].num_pages - 1, 262_144);
let total: u32 = blocks.iter().map(|b| b.num_pages).sum();
assert_eq!(
total, p,
"bead_id={TEST_BEAD_ID} case=large_db_total_coverage"
);
for (i, block) in blocks.iter().enumerate() {
assert_eq!(
block.index,
u8::try_from(i).unwrap(),
"bead_id={TEST_BEAD_ID} case=large_db_sequential_indices"
);
}
}
#[test]
fn test_partition_large_db_10000() {
let blocks = partition_source_blocks(10_000).expect("10k pages should succeed");
assert_eq!(
blocks.len(),
1,
"bead_id={TEST_BEAD_ID} case=large_db_10k_single_block"
);
assert_eq!(blocks[0].num_pages, 10_000);
}
#[test]
fn test_partition_uneven() {
let p = K_MAX * 3 + 7; let blocks = partition_source_blocks(p).expect("uneven split should succeed");
let total: u32 = blocks.iter().map(|b| b.num_pages).sum();
assert_eq!(
total, p,
"bead_id={TEST_BEAD_ID} case=uneven_total_coverage"
);
for block in &blocks {
assert!(
block.num_pages <= K_MAX,
"bead_id={TEST_BEAD_ID} case=uneven_k_max block={}",
block.index
);
}
let max_k = blocks.iter().map(|b| b.num_pages).max().unwrap();
let min_k = blocks.iter().map(|b| b.num_pages).min().unwrap();
assert!(
max_k - min_k <= 1,
"bead_id={TEST_BEAD_ID} case=uneven_balanced max_k={max_k} min_k={min_k}"
);
for window in blocks.windows(2) {
assert_eq!(
window[1].start_page,
window[0].start_page + window[0].num_pages,
"bead_id={TEST_BEAD_ID} case=uneven_contiguous"
);
}
}
#[test]
fn test_partition_page1_special() {
for p in [1_u32, 64, K_MAX, K_MAX + 1, 262_144] {
let blocks = partition_source_blocks(p).expect("partition should succeed");
assert!(
!blocks.is_empty(),
"bead_id={TEST_BEAD_ID} case=page1_nonempty p={p}"
);
assert_eq!(
blocks[0].start_page, 1,
"bead_id={TEST_BEAD_ID} case=page1_in_first_block p={p}"
);
}
}
#[test]
fn test_partition_maximum_blocks() {
let p_u64 = u64::from(K_MAX) * 256;
assert!(
u32::try_from(p_u64).is_ok(),
"test precondition: fits in u32"
);
let p = u32::try_from(p_u64).unwrap();
let blocks = partition_source_blocks(p).expect("max blocks should succeed");
assert_eq!(
blocks.len(),
256,
"bead_id={TEST_BEAD_ID} case=max_blocks_256"
);
for block in &blocks {
assert_eq!(
block.num_pages, K_MAX,
"bead_id={TEST_BEAD_ID} case=max_blocks_each_k_max block={}",
block.index
);
}
assert_eq!(blocks[255].index, 255);
let total: u64 = blocks.iter().map(|b| u64::from(b.num_pages)).sum();
assert_eq!(
total, p_u64,
"bead_id={TEST_BEAD_ID} case=max_blocks_coverage"
);
}
#[test]
fn test_partition_overflow_too_many_pages() {
let p_u64 = u64::from(K_MAX) * 256 + 1;
if let Ok(p) = u32::try_from(p_u64) {
let result = partition_source_blocks(p);
assert!(
result.is_err(),
"bead_id={TEST_BEAD_ID} case=overflow_error p={p}"
);
}
}
#[test]
fn prop_partition_coverage() {
for p in [
1_u32,
2,
63,
64,
100,
1000,
K_MAX - 1,
K_MAX,
K_MAX + 1,
K_MAX * 2,
262_144,
K_MAX * 100,
] {
let blocks = partition_source_blocks(p).expect("partition should succeed");
let total: u32 = blocks.iter().map(|b| b.num_pages).sum();
assert_eq!(total, p, "bead_id={TEST_BEAD_ID} case=prop_coverage p={p}");
}
}
#[test]
fn prop_partition_deterministic() {
for p in [1_u32, K_MAX, K_MAX + 1, 262_144] {
let a = partition_source_blocks(p).expect("first call");
let b = partition_source_blocks(p).expect("second call");
assert_eq!(a, b, "bead_id={TEST_BEAD_ID} case=prop_deterministic p={p}");
}
}
#[test]
fn prop_partition_k_max_bounds() {
for p in [K_MAX + 1, K_MAX * 2, K_MAX * 3 + 7, 262_144, K_MAX * 100] {
let blocks = partition_source_blocks(p).expect("partition should succeed");
for block in &blocks {
assert!(
block.num_pages <= K_MAX,
"bead_id={TEST_BEAD_ID} case=prop_k_max_bound p={p} block={} num_pages={}",
block.index,
block.num_pages
);
}
}
}
#[test]
fn prop_partition_contiguous_non_overlapping() {
for p in [K_MAX + 1, K_MAX * 5, 262_144] {
let blocks = partition_source_blocks(p).expect("partition should succeed");
for window in blocks.windows(2) {
let end_prev = window[0].start_page + window[0].num_pages;
assert_eq!(
window[1].start_page, end_prev,
"bead_id={TEST_BEAD_ID} case=prop_contiguous p={p}"
);
}
assert_eq!(blocks[0].start_page, 1);
let last = blocks.last().unwrap();
assert_eq!(last.start_page + last.num_pages - 1, p);
}
}
#[test]
fn prop_partition_sequential_indices() {
for p in [K_MAX + 1, K_MAX * 3, 262_144] {
let blocks = partition_source_blocks(p).expect("partition should succeed");
for (i, block) in blocks.iter().enumerate() {
assert_eq!(
block.index,
u8::try_from(i).unwrap(),
"bead_id={TEST_BEAD_ID} case=prop_sequential_indices p={p}"
);
}
}
}
#[test]
fn test_bd_1hi_6_unit_compliance_gate() {
assert_eq!(K_MAX, 56_403, "RFC 6330 K_max constant");
assert_eq!(SBN_MAX, 255, "RFC 6330 SBN_max constant");
assert_eq!(
MAX_PARTITIONABLE_PAGES,
56_403 * 256,
"max partitionable pages"
);
let _ = partition_source_blocks(0);
let _ = partition_source_blocks(1);
let _ = partition_source_blocks(K_MAX);
}
#[test]
fn prop_bd_1hi_6_structure_compliance() {
let test_sizes = [
1_u32,
100,
K_MAX,
K_MAX + 1,
K_MAX * 2,
K_MAX * 100,
K_MAX * 256,
];
for &p in &test_sizes {
let blocks = partition_source_blocks(p).expect("valid partition");
let total: u32 = blocks.iter().map(|b| b.num_pages).sum();
assert_eq!(total, p);
for b in &blocks {
assert!(b.num_pages <= K_MAX);
assert!(b.num_pages > 0);
}
for (i, b) in blocks.iter().enumerate() {
assert_eq!(b.index, u8::try_from(i).unwrap());
}
}
}
}